Passed
Push — master ( 604dcd...57fbcd )
by John
03:24
created

bbarchivist.networkutils.getcode()   A

Complexity

Conditions 2

Size

Total Lines 18
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 9
nop 2
dl 0
loc 18
ccs 9
cts 9
cp 1
crap 2
rs 9.4285
c 0
b 0
f 0
1
#!/usr/bin/env python3
0 ignored issues
show
coding-style introduced by
Too many lines in module (1668/1000)
Loading history...
2 5
"""This module is used for network connections; APIs, downloading, etc."""
3
4 5
import base64  # encoding
5 5
import binascii  # encoding
6 5
import concurrent.futures  # multiprocessing/threading
7 5
import glob  # pem file lookup
8 5
import hashlib  # salt
9 5
import os  # filesystem read
10 5
import random  # salt
11 5
import re  # regexes
12 5
import time  # salt
13 5
import zlib  # encoding
14
15 5
import requests  # downloading
16 5
from bs4 import BeautifulSoup  # scraping
17 5
from bbarchivist import utilities  # parse filesize
18 5
from bbarchivist.bbconstants import SERVERS, TCLMASTERS  # lookup servers
19
20 5
try:
21 5
    from defusedxml import ElementTree  # safer XML parsing
22 1
except (ImportError, AttributeError):
23 1
    from xml.etree import ElementTree  # XML parsing
24
25 5
__author__ = "Thurask"
26 5
__license__ = "WTFPL v2"
27 5
__copyright__ = "2015-2018 Thurask"
28
29
30 5
def grab_pem():
31
    """
32
    Work with either local cacerts or system cacerts.
33
    """
34 5
    try:
35 5
        pemfile = glob.glob(os.path.join(os.getcwd(), "cacert.pem"))[0]
36 5
    except IndexError:
37 5
        return requests.certs.where()  # no local cacerts
38
    else:
39 5
        return os.path.abspath(pemfile)  # local cacerts
40
41
42 5
def pem_wrapper(method):
43
    """
44
    Decorator to set REQUESTS_CA_BUNDLE.
45
46
    :param method: Method to use.
47
    :type method: function
48
    """
49 5
    def wrapper(*args, **kwargs):
50
        """
51
        Set REQUESTS_CA_BUNDLE before doing function.
52
        """
53 5
        os.environ["REQUESTS_CA_BUNDLE"] = grab_pem()
54 5
        return method(*args, **kwargs)
55 5
    return wrapper
56
57
58 5
def try_try_again(method):
59
    """
60
    Decorator to absorb timeouts, proxy errors, and other common exceptions.
61
62
    :param method: Method to use.
63
    :type method: function
64
    """
65 5
    def wrapper(*args, **kwargs):
66
        """
67
        Try function, try it again up to five times, and leave gracefully.
68
        """
69 5
        tries = 5
70 5
        for _ in range(tries):
71 5
            try:
72 5
                result = method(*args, **kwargs)
73 5
            except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, requests.exceptions.ProxyError):
74 5
                continue
75
            else:
76 5
                break
77
        else:
78 5
            result = None
79 5
        return result
80 5
    return wrapper
81
82
83 5
def generic_session(session=None):
84
    """
85
    Create a Requests session object on the fly, if need be.
86
87
    :param session: Requests session object, created if this is None.
88
    :type session: requests.Session()
89
    """
90 5
    sess = requests.Session() if session is None else session
91 5
    return sess
92
93
94 5
def generic_soup_parser(url, session=None):
95
    """
96
    Get a BeautifulSoup HTML parser for some URL.
97
98
    :param url: The URL to check.
99
    :type url: str
100
101
    :param session: Requests session object, default is created on the fly.
102
    :type session: requests.Session()
103
    """
104 5
    session = generic_session(session)
105 5
    req = session.get(url)
106 5
    soup = BeautifulSoup(req.content, "html.parser")
107 5
    return soup
108
109
110 5
@pem_wrapper
111 5
def get_length(url, session=None):
112
    """
113
    Get content-length header from some URL.
114
115
    :param url: The URL to check.
116
    :type url: str
117
118
    :param session: Requests session object, default is created on the fly.
119
    :type session: requests.Session()
120
    """
121 5
    session = generic_session(session)
122 5
    if url is None:
123 5
        return 0
124 5
    try:
125 5
        heads = session.head(url)
126 5
        fsize = heads.headers['content-length']
127 5
        return int(fsize)
128 5
    except requests.ConnectionError:
129 5
        return 0
130
131
132 5
@pem_wrapper
133 5
def download(url, output_directory=None, session=None):
134
    """
135
    Download file from given URL.
136
137
    :param url: URL to download from.
138
    :type url: str
139
140
    :param output_directory: Download folder. Default is local.
141
    :type output_directory: str
142
143
    :param session: Requests session object, default is created on the fly.
144
    :type session: requests.Session()
145
    """
146 5
    session = generic_session(session)
147 5
    output_directory = utilities.dirhandler(output_directory, os.getcwd())
148 5
    lfname = url.split('/')[-1]
149 5
    sname = utilities.stripper(lfname)
150 5
    fname = os.path.join(output_directory, lfname)
151 5
    download_writer(url, fname, lfname, sname, session)
152 5
    remove_empty_download(fname)
153
154
155 5
def remove_empty_download(fname):
156
    """
157
    Remove file if it's empty.
158
159
    :param fname: File path.
160
    :type fname: str
161
    """
162 5
    if os.stat(fname).st_size == 0:
163 5
        os.remove(fname)
164
165
166 5
def download_writer(url, fname, lfname, sname, session=None):
167
    """
168
    Download file and write to disk.
169
170
    :param url: URL to download from.
171
    :type url: str
172
173
    :param fname: File path.
174
    :type fname: str
175
176
    :param lfname: Long filename.
177
    :type lfname: str
178
179
    :param sname: Short name, for printing to screen.
180
    :type sname: str
181
182
    :param session: Requests session object, default is created on the fly.
183
    :type session: requests.Session()
184
    """
185 5
    with open(fname, "wb") as file:
186 5
        req = session.get(url, stream=True)
187 5
        clength = req.headers['content-length']
188 5
        fsize = utilities.fsizer(clength)
189 5
        if req.status_code == 200:  # 200 OK
190 5
            print("DOWNLOADING {0} [{1}]".format(sname, fsize))
191 5
            for chunk in req.iter_content(chunk_size=1024):
192 5
                file.write(chunk)
193
        else:
194 5
            print("ERROR: HTTP {0} IN {1}".format(req.status_code, lfname))
195
196
197 5
def download_bootstrap(urls, outdir=None, workers=5, session=None):
198
    """
199
    Run downloaders for each file in given URL iterable.
200
201
    :param urls: URLs to download.
202
    :type urls: list
203
204
    :param outdir: Download folder. Default is handled in :func:`download`.
205
    :type outdir: str
206
207
    :param workers: Number of worker processes. Default is 5.
208
    :type workers: int
209
210
    :param session: Requests session object, default is created on the fly.
211
    :type session: requests.Session()
212
    """
213 5
    workers = len(urls) if len(urls) < workers else workers
214 5
    spinman = utilities.SpinManager()
215 5
    with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as xec:
216 5
        try:
217 5
            spinman.start()
218 5
            for url in urls:
219 5
                xec.submit(download, url, outdir, session)
220 5
        except (KeyboardInterrupt, SystemExit):
221 5
            xec.shutdown()
222 5
            spinman.stop()
223 5
    spinman.stop()
224 5
    utilities.spinner_clear()
225 5
    utilities.line_begin()
226
227
228 5
def download_android_tools(downloaddir=None):
229
    """
230
    Download Android SDK platform tools.
231
232
    :param downloaddir: Directory name, default is "plattools".
233
    :type downloaddir: str
234
    """
235 5
    if downloaddir is None:
236 5
        downloaddir = "plattools"
237 5
    if os.path.exists(downloaddir):
238 5
        os.removedirs(downloaddir)
239 5
    os.mkdir(downloaddir)
240 5
    platforms = ("windows", "linux", "darwin")
241 5
    baseurl = "https://dl.google.com/android/repository/platform-tools-latest"
242 5
    dlurls = ["{1}-{0}.zip".format(plat, baseurl) for plat in platforms]
243 5
    sess = generic_session()
244 5
    download_bootstrap(dlurls, outdir="plattools", session=sess)
245
246
247 5
@pem_wrapper
248 5
def getcode(url, session=None):
249
    """
250
    Return status code of given URL.
251
252
    :param url: URL to check.
253
    :type url: str
254
255
    :param session: Requests session object, default is created on the fly.
256
    :type session: requests.Session()
257
    """
258 5
    session = generic_session(session)
259 5
    try:
260 5
        shead = session.head(url)
261 5
        status = int(shead.status_code)
262 5
        return status
263 5
    except requests.ConnectionError:
264 5
        return 404
265
266
267 5
@pem_wrapper
268 5
def availability(url, session=None):
269
    """
270
    Check HTTP status code of given URL.
271
    200 or 301-308 is OK, else is not.
272
273
    :param url: URL to check.
274
    :type url: str
275
276
    :param session: Requests session object, default is created on the fly.
277
    :type session: requests.Session()
278
    """
279 5
    status = getcode(url, session)
280 5
    return status == 200 or 300 < status <= 308
281
282
283 5
def clean_availability(results, server):
284
    """
285
    Clean availability for autolookup script.
286
287
    :param results: Result dict.
288
    :type results: dict(str: str)
289
290
    :param server: Server, key for result dict.
291
    :type server: str
292
    """
293 5
    marker = "PD" if server == "p" else server.upper()
294 5
    rel = results[server.lower()]
295 5
    avail = marker if rel != "SR not in system" and rel is not None else "  "
296 5
    return rel, avail
297
298
299 5
def tcl_master():
300
    """
301
    Get a random master server.
302
    """
303 5
    return random.choice(TCLMASTERS)
304
305
306 5
def tcl_default_id(devid):
307
    """
308
    Get an IMEI or a serial number or something.
309
310
    :param devid: Return default if this is None.
311
    :type devid: str
312
    """
313 5
    if devid is None:
314 5
        devid = "543212345000000"
315 5
    return devid
316
317
318 5
def check_prep(curef, mode=4, fvver="AAA000", cltp=2010, cktp=2, rtd=1, chnl=2, devid=None):
319
    """
320
    Prepare variables for TCL update check.
321
322
    :param curef: PRD of the phone variant to check.
323
    :type curef: str
324
325
    :param mode: 4 if downloading autoloaders, 2 if downloading OTA deltas.
326
    :type mode: int
327
328
    :param fvver: Initial software version, must be specific if downloading OTA deltas.
329
    :type fvver: str
330
331
    :param cltp: 2010 to always show latest version, 10 to show actual updates. Default is 2010.
332
    :type cltp: int
333
334
    :param cktp: 2 if checking manually, 1 if checking automatically. Default is 2.
335
    :type cktp: int
336
337
    :param rtd: 2 if rooted, 1 if not. Default is 1.
338
    :type rtd: int
339
340
    :param chnl: 2 if checking on WiFi, 1 if checking on mobile. Default is 2.
341
    :type chnl: int
342
343
    :param devid: Serial number/IMEI. Default is fake, not that it matters.
344
    :type devid: str
345
    """
346 5
    devid = tcl_default_id(devid)
347 5
    geturl = "http://{0}/check.php".format(tcl_master())
348 5
    params = {"id": devid, "curef": curef, "fv": fvver, "mode": mode, "type": "Firmware", "cltp": cltp, "cktp": cktp, "rtd": rtd, "chnl": chnl}
349 5
    return geturl, params
350
351
352 5
@pem_wrapper
353 5
@try_try_again
354 5
def tcl_check(curef, session=None, mode=4, fvver="AAA000", export=False):
355
    """
356
    Check TCL server for updates.
357
358
    :param curef: PRD of the phone variant to check.
359
    :type curef: str
360
361
    :param session: Requests session object, default is created on the fly.
362
    :type session: requests.Session()
363
364
    :param mode: 4 if downloading autoloaders, 2 if downloading OTA deltas.
365
    :type mode: int
366
367
    :param fvver: Initial software version, must be specific if downloading OTA deltas.
368
    :type fvver: str
369
370
    :param export: Whether to export XML response to file. Default is False.
371
    :type export: bool
372
    """
373 5
    sess = generic_session(session)
374 5
    geturl, params = check_prep(curef, mode, fvver)
375 5
    req = sess.get(geturl, params=params)
376 5
    if req.status_code == 200:
377 5
        req.encoding = "utf-8"
378 5
        response = req.text
379 5
        if export:
380 5
            dump_tcl_xml(response)
381
    else:
382 5
        response = None
383 5
    return response
384
385
386 5
def parse_tcl_check(data):
387
    """
388
    Extract version and file info from TCL update server response.
389
390
    :param data: The data to parse.
391
    :type data: str
392
    """
393 5
    root = ElementTree.fromstring(data)
394 5
    tvver = root.find("VERSION").find("TV").text
395 5
    fwid = root.find("FIRMWARE").find("FW_ID").text
396 5
    fileinfo = root.find("FIRMWARE").find("FILESET").find("FILE")
397 5
    filename = fileinfo.find("FILENAME").text
398 5
    filesize = fileinfo.find("SIZE").text
399 5
    filehash = fileinfo.find("CHECKSUM").text
400 5
    return tvver, fwid, filename, filesize, filehash
401
402
403 5
def tcl_salt():
404
    """
405
    Generate salt value for TCL server tools.
406
    """
407 5
    millis = round(time.time() * 1000)
408 5
    tail = "{0:06d}".format(random.randint(0, 999999))
409 5
    return "{0}{1}".format(str(millis), tail)
410
411
412 5
def dump_tcl_xml(xmldata):
413
    """
414
    Write XML responses to output directory.
415
416
    :param xmldata: Response XML.
417
    :type xmldata: str
418
    """
419 5
    outfile = os.path.join(os.getcwd(), "logs", "{0}.xml".format(tcl_salt()))
420 5
    if not os.path.exists(os.path.dirname(outfile)):
421 5
        os.makedirs(os.path.dirname(outfile))
422 5
    with open(outfile, "w", encoding="utf-8") as afile:
423 5
        afile.write(xmldata)
424
425
426 5
def unpack_vdkey():
427
    """
428
    Draw the curtain back.
429
    """
430 5
    vdkey = b"eJwdjwEOwDAIAr8kKFr//7HhmqXp8AIIDrYAgg8byiUXrwRJRXja+d6iNxu0AhUooDCN9rd6rDLxmGIakUVWo3IGCTRWqCAt6X4jGEIUAxgN0eYWnp+LkpHQAg/PsO90ELsy0Npm/n2HbtPndFgGEV31R9OmT4O4nrddjc3Qt6nWscx7e+WRHq5UnOudtjw5skuV09pFhvmqnOEIs4ljPeel1wfLYUF4\n"
431 5
    vdk = zlib.decompress(binascii.a2b_base64(vdkey))
432 5
    return vdk.decode("utf-8")
433
434
435 5
def vkhash(curef, tvver, fwid, salt, mode=4, fvver="AAA000", cltp=2010, devid=None):
436
    """
437
    Generate hash from TCL update server variables.
438
439
    :param curef: PRD of the phone variant to check.
440
    :type curef: str
441
442
    :param tvver: Target software version.
443
    :type tvver: str
444
445
    :param fwid: Firmware ID for desired download file.
446
    :type fwid: str
447
448
    :param salt: Salt hash.
449
    :type salt: str
450
451
    :param mode: 4 if downloading autoloaders, 2 if downloading OTA deltas.
452
    :type mode: int
453
454
    :param fvver: Initial software version, must be specific if downloading OTA deltas.
455
    :type fvver: str
456
457
    :param cltp: 2010 to always show latest version, 10 to show actual updates. Default is 2010.
458
    :type cltp: int
459
460
    :param devid: Serial number/IMEI. Default is fake, not that it matters.
461
    :type devid: str
462
    """
463 5
    vdk = unpack_vdkey()
464 5
    devid = tcl_default_id(devid)
465 5
    query = "id={0}&salt={1}&curef={2}&fv={3}&tv={4}&type={5}&fw_id={6}&mode={7}&cltp={8}{9}".format(devid, salt, curef, fvver, tvver, "Firmware", fwid, mode, cltp, vdk)
466 5
    engine = hashlib.sha1()
467 5
    engine.update(bytes(query, "utf-8"))
468 5
    return engine.hexdigest()
469
470
471 5
def download_request_prep(curef, tvver, fwid, salt, vkh, mode=4, fvver="AAA000", cltp=2010, devid=None):
472
    """
473
    Prepare variables for download server check.
474
475
    :param curef: PRD of the phone variant to check.
476
    :type curef: str
477
478
    :param tvver: Target software version.
479
    :type tvver: str
480
481
    :param fwid: Firmware ID for desired download file.
482
    :type fwid: str
483
484
    :param salt: Salt hash.
485
    :type salt: str
486
487
    :param vkh: VDKey-based hash.
488
    :type vkh: str
489
490
    :param mode: 4 if downloading autoloaders, 2 if downloading OTA deltas.
491
    :type mode: int
492
493
    :param fvver: Initial software version, must be specific if downloading OTA deltas.
494
    :type fvver: str
495
496
    :param cltp: 2010 to always show latest version, 10 to show actual updates. Default is 2010.
497
    :type cltp: int
498
499
    :param devid: Serial number/IMEI. Default is fake, not that it matters.
500
    :type devid: str
501
    """
502 5
    devid = tcl_default_id(devid)
503 5
    posturl = "http://{0}/download_request.php".format(tcl_master())
504 5
    params = {"id": devid, "curef": curef, "fv": fvver, "mode": mode, "type": "Firmware", "tv": tvver, "fw_id": fwid, "salt": salt, "vk": vkh, "cltp": cltp}
505 5
    if mode == 4:
506 5
        params["foot"] = 1
507 5
    return posturl, params
508
509
510 5
@pem_wrapper
511 5
@try_try_again
512 5
def tcl_download_request(curef, tvver, fwid, salt, vkh, session=None, mode=4, fvver="AAA000", export=False):
513
    """
514
    Check TCL server for download URLs.
515
516
    :param curef: PRD of the phone variant to check.
517
    :type curef: str
518
519
    :param tvver: Target software version.
520
    :type tvver: str
521
522
    :param fwid: Firmware ID for desired download file.
523
    :type fwid: str
524
525
    :param salt: Salt hash.
526
    :type salt: str
527
528
    :param vkh: VDKey-based hash.
529
    :type vkh: str
530
531
    :param session: Requests session object, default is created on the fly.
532
    :type session: requests.Session()
533
534
    :param mode: 4 if downloading autoloaders, 2 if downloading OTA deltas.
535
    :type mode: int
536
537
    :param fvver: Initial software version, must be specific if downloading OTA deltas.
538
    :type fvver: str
539
540
    :param export: Whether to export XML response to file. Default is False.
541
    :type export: bool
542
    """
543 5
    sess = generic_session(session)
544 5
    posturl, params = download_request_prep(curef, tvver, fwid, salt, vkh, mode, fvver)
545 5
    req = sess.post(posturl, data=params)
546 5
    if req.status_code == 200:
547 5
        req.encoding = "utf-8"
548 5
        response = req.text
549 5
        if export:
550 5
            dump_tcl_xml(response)
551
    else:
552 5
        response = None
553 5
    return response
554
555
556 5
def parse_tcl_download_request(body, mode=4):
557
    """
558
    Extract file URL and encrypt slave URL from TCL update server response.
559
560
    :param data: The data to parse.
561
    :type data: str
562
563
    :param mode: 4 if downloading autoloaders, 2 if downloading OTA deltas.
564
    :type mode: int
565
    """
566 5
    root = ElementTree.fromstring(body)
567 5
    slavelist = root.find("SLAVE_LIST").findall("SLAVE")
568 5
    slave = random.choice(slavelist).text
569 5
    dlurl = root.find("FILE_LIST").find("FILE").find("DOWNLOAD_URL").text
570 5
    eslave = root.find("SLAVE_LIST").findall("ENCRYPT_SLAVE")
571 5
    encslave = None if mode == 2 or not eslave else random.choice(eslave).text
572 5
    return "http://{0}{1}".format(slave, dlurl), encslave
573
574
575 5
def encrypt_header_prep(address, encslave):
576
    """
577
    Prepare variables for encrypted header check.
578
579
    :param address: File URL minus host.
580
    :type address: str
581
582
    :param encslave: Server hosting header script.
583
    :type encslave: str
584
    """
585 5
    encs = {b"YWNjb3VudA==" : b"emhlbmdodWEuZ2Fv", b"cGFzc3dvcmQ=": b"cWFydUQ0b2s="}
586 5
    params = {base64.b64decode(key): base64.b64decode(val) for key, val in encs.items()}
587 5
    params[b"address"] = bytes(address, "utf-8")
588 5
    posturl = "http://{0}/encrypt_header.php".format(encslave)
589 5
    return posturl, params
590
591
592 5
@pem_wrapper
593 5
def encrypt_header(address, encslave, session=None):
594
    """
595
    Check encrypted header.
596
597
    :param address: File URL minus host.
598
    :type address: str
599
600
    :param encslave: Server hosting header script.
601
    :type encslave: str
602
603
    :param session: Requests session object, default is created on the fly.
604
    :type session: requests.Session()
605
    """
606 5
    sess = generic_session(session)
607 5
    posturl, params = encrypt_header_prep(address, encslave)
608 5
    req = sess.post(posturl, data=params)
609 5
    if req.status_code == 206:  # partial
610 5
        contentlength = int(req.headers["Content-Length"])
611 5
        sentinel = "HEADER FOUND" if contentlength == 4194320 else "NO HEADER FOUND"
612
    else:
613 5
        sentinel = None
614 5
    return sentinel
615
616
617 5
@pem_wrapper
618
def remote_prd_info():
619
    """
620
    Get list of remote OTA versions.
621
    """
622 5
    dburl = "https://tclota.birth-online.de/json_lastupdates.php"
623 5
    req = requests.get(dburl)
624 5
    reqj = req.json()
625 5
    otadict = {val["curef"]: val["last_ota"] for val in reqj.values() if val["last_ota"] is not None}
626 5
    return otadict
627
628
629 5
def cchecker_get_tags(root):
630
    """
631
    Get country and carrier from XML.
632
633
    :param root: ElementTree we're barking up.
634
    :type root: xml.etree.ElementTree.ElementTree
635
    """
636 5
    for child in root:
637 5
        if child.tag == "country":
638 5
            country = child.get("name")
639 5
        if child.tag == "carrier":
640 5
            carrier = child.get("name")
641 5
    return country, carrier
642
643
644 5
@pem_wrapper
645 5
def carrier_checker(mcc, mnc, session=None):
646
    """
647
    Query BlackBerry World to map a MCC and a MNC to a country and carrier.
648
649
    :param mcc: Country code.
650
    :type mcc: int
651
652
    :param mnc: Network code.
653
    :type mnc: int
654
655
    :param session: Requests session object, default is created on the fly.
656
    :type session: requests.Session()
657
    """
658 5
    session = generic_session(session)
659 5
    baseurl = "http://appworld.blackberry.com/ClientAPI/checkcarrier"
660 5
    url = "{2}?homemcc={0}&homemnc={1}&devicevendorid=-1&pin=0".format(mcc, mnc, baseurl)
661 5
    user_agent = {'User-agent': 'AppWorld/5.1.0.60'}
662 5
    req = session.get(url, headers=user_agent)
663 5
    root = ElementTree.fromstring(req.text)
664 5
    country, carrier = cchecker_get_tags(root)
665 5
    return country, carrier
666
667
668 5
def return_npc(mcc, mnc):
669
    """
670
    Format MCC and MNC into a NPC.
671
672
    :param mcc: Country code.
673
    :type mcc: int
674
675
    :param mnc: Network code.
676
    :type mnc: int
677
    """
678 5
    return "{0}{1}30".format(str(mcc).zfill(3), str(mnc).zfill(3))
679
680
681 5
@pem_wrapper
682 5
def carrier_query(npc, device, upgrade=False, blitz=False, forced=None, session=None):
683
    """
684
    Query BlackBerry servers, check which update is out for a carrier.
685
686
    :param npc: MCC + MNC (see `func:return_npc`)
687
    :type npc: int
688
689
    :param device: Hexadecimal hardware ID.
690
    :type device: str
691
692
    :param upgrade: Whether to use upgrade files. False by default.
693
    :type upgrade: bool
694
695
    :param blitz: Whether or not to create a blitz package. False by default.
696
    :type blitz: bool
697
698
    :param forced: Force a software release.
699
    :type forced: str
700
701
    :param session: Requests session object, default is created on the fly.
702
    :type session: requests.Session()
703
    """
704 5
    session = generic_session(session)
705 5
    upg = "upgrade" if upgrade else "repair"
706 5
    forced = "latest" if forced is None else forced
707 5
    url = "https://cs.sl.blackberry.com/cse/updateDetails/2.2/"
708 5
    query = '<?xml version="1.0" encoding="UTF-8"?>'
709 5
    query += '<updateDetailRequest version="2.2.1" authEchoTS="1366644680359">'
710 5
    query += "<clientProperties>"
711 5
    query += "<hardware>"
712 5
    query += "<pin>0x2FFFFFB3</pin><bsn>1128121361</bsn>"
713 5
    query += "<imei>004401139269240</imei>"
714 5
    query += "<id>0x{0}</id>".format(device)
715 5
    query += "</hardware>"
716 5
    query += "<network>"
717 5
    query += "<homeNPC>0x{0}</homeNPC>".format(npc)
718 5
    query += "<iccid>89014104255505565333</iccid>"
719 5
    query += "</network>"
720 5
    query += "<software>"
721 5
    query += "<currentLocale>en_US</currentLocale>"
722 5
    query += "<legalLocale>en_US</legalLocale>"
723 5
    query += "</software>"
724 5
    query += "</clientProperties>"
725 5
    query += "<updateDirectives>"
726 5
    query += '<allowPatching type="REDBEND">true</allowPatching>'
727 5
    query += "<upgradeMode>{0}</upgradeMode>".format(upg)
728 5
    query += "<provideDescriptions>false</provideDescriptions>"
729 5
    query += "<provideFiles>true</provideFiles>"
730 5
    query += "<queryType>NOTIFICATION_CHECK</queryType>"
731 5
    query += "</updateDirectives>"
732 5
    query += "<pollType>manual</pollType>"
733 5
    query += "<resultPackageSetCriteria>"
734 5
    query += '<softwareRelease softwareReleaseVersion="{0}" />'.format(forced)
735 5
    query += "<releaseIndependent>"
736 5
    query += '<packageType operation="include">application</packageType>'
737 5
    query += "</releaseIndependent>"
738 5
    query += "</resultPackageSetCriteria>"
739 5
    query += "</updateDetailRequest>"
740 5
    header = {"Content-Type": "text/xml;charset=UTF-8"}
741 5
    req = session.post(url, headers=header, data=query)
742 5
    return parse_carrier_xml(req.text, blitz)
743
744
745 5
def carrier_swver_get(root):
746
    """
747
    Get software release from carrier XML.
748
749
    :param root: ElementTree we're barking up.
750
    :type root: xml.etree.ElementTree.ElementTree
751
    """
752 5
    for child in root.iter("softwareReleaseMetadata"):
753 5
        swver = child.get("softwareReleaseVersion")
754 5
    return swver
755
756
757 5
def carrier_child_fileappend(child, files, baseurl, blitz=False):
758
    """
759
    Append bar file links to a list from a child element.
760
761
    :param child: Child element in use.
762
    :type child: xml.etree.ElementTree.Element
763
764
    :param files: Filelist.
765
    :type files: list(str)
766
767
    :param baseurl: Base URL, URL minus the filename.
768
    :type baseurl: str
769
770
    :param blitz: Whether or not to create a blitz package. False by default.
771
    :type blitz: bool
772
    """
773 5
    if not blitz:
774 5
        files.append(baseurl + child.get("path"))
775
    else:
776 5
        if child.get("type") not in ["system:radio", "system:desktop", "system:os"]:
777 5
            files.append(baseurl + child.get("path"))
778 5
    return files
779
780
781 5
def carrier_child_finder(root, files, baseurl, blitz=False):
782
    """
783
    Extract filenames, radio and OS from child elements.
784
785
    :param root: ElementTree we're barking up.
786
    :type root: xml.etree.ElementTree.ElementTree
787
788
    :param files: Filelist.
789
    :type files: list(str)
790
791
    :param baseurl: Base URL, URL minus the filename.
792
    :type baseurl: str
793
794
    :param blitz: Whether or not to create a blitz package. False by default.
795
    :type blitz: bool
796
    """
797 5
    osver = radver = ""
798 5
    for child in root.iter("package"):
799 5
        files = carrier_child_fileappend(child, files, baseurl, blitz)
800 5
        if child.get("type") == "system:radio":
801 5
            radver = child.get("version")
802 5
        elif child.get("type") == "system:desktop":
803 5
            osver = child.get("version")
804 5
        elif child.get("type") == "system:os":
805 5
            osver = child.get("version")
806 5
    return osver, radver, files
807
808
809 5
def parse_carrier_xml(data, blitz=False):
810
    """
811
    Parse the response to a carrier update request and return the juicy bits.
812
813
    :param data: The data to parse.
814
    :type data: xml
815
816
    :param blitz: Whether or not to create a blitz package. False by default.
817
    :type blitz: bool
818
    """
819 5
    root = ElementTree.fromstring(data)
820 5
    sw_exists = root.find('./data/content/softwareReleaseMetadata')
821 5
    swver = "N/A" if sw_exists is None else ""
822 5
    if sw_exists is not None:
823 5
        swver = carrier_swver_get(root)
824 5
    files = []
825 5
    package_exists = root.find('./data/content/fileSets/fileSet')
826 5
    osver = radver = ""
827 5
    if package_exists is not None:
828 5
        baseurl = "{0}/".format(package_exists.get("url"))
829 5
        osver, radver, files = carrier_child_finder(root, files, baseurl, blitz)
830 5
    return(swver, osver, radver, files)
831
832
833 5
@pem_wrapper
834 5
def sr_lookup(osver, server, session=None):
835
    """
836
    Software release lookup, with choice of server.
837
    :data:`bbarchivist.bbconstants.SERVERLIST` for server list.
838
839
    :param osver: OS version to lookup, 10.x.y.zzzz.
840
    :type osver: str
841
842
    :param server: Server to use.
843
    :type server: str
844
845
    :param session: Requests session object, default is created on the fly.
846
    :type session: requests.Session()
847
    """
848 5
    query = '<?xml version="1.0" encoding="UTF-8"?>'
849 5
    query += '<srVersionLookupRequest version="2.0.0"'
850 5
    query += ' authEchoTS="1366644680359">'
851 5
    query += '<clientProperties><hardware>'
852 5
    query += '<pin>0x2FFFFFB3</pin><bsn>1140011878</bsn>'
853 5
    query += '<imei>004402242176786</imei><id>0x8D00240A</id>'
854 5
    query += '<isBootROMSecure>true</isBootROMSecure>'
855 5
    query += '</hardware>'
856 5
    query += '<network>'
857 5
    query += '<vendorId>0x0</vendorId><homeNPC>0x60</homeNPC>'
858 5
    query += '<currentNPC>0x60</currentNPC><ecid>0x1</ecid>'
859 5
    query += '</network>'
860 5
    query += '<software><currentLocale>en_US</currentLocale>'
861 5
    query += '<legalLocale>en_US</legalLocale>'
862 5
    query += '<osVersion>{0}</osVersion>'.format(osver)
863 5
    query += '<omadmEnabled>false</omadmEnabled>'
864 5
    query += '</software></clientProperties>'
865 5
    query += '</srVersionLookupRequest>'
866 5
    reqtext = sr_lookup_poster(query, server, session)
867 5
    packtext = sr_lookup_xmlparser(reqtext)
868 5
    return packtext
869
870
871 5
def sr_lookup_poster(query, server, session=None):
872
    """
873
    Post the XML payload for a software release lookup.
874
875
    :param query: XML payload.
876
    :type query: str
877
878
    :param server: Server to use.
879
    :type server: str
880
881
    :param session: Requests session object, default is created on the fly.
882
    :type session: requests.Session()
883
    """
884 5
    session = generic_session(session)
885 5
    header = {"Content-Type": "text/xml;charset=UTF-8"}
886 5
    try:
887 5
        req = session.post(server, headers=header, data=query, timeout=1)
888 5
    except (requests.exceptions.Timeout, requests.exceptions.ConnectionError):
889 5
        reqtext = "SR not in system"
890
    else:
891 5
        reqtext = req.text
892 5
    return reqtext
893
894
895 5
def sr_lookup_xmlparser(reqtext):
896
    """
897
    Take the text of a software lookup request response and parse it as XML.
898
899
    :param reqtext: Response text, hopefully XML formatted.
900
    :type reqtext: str
901
    """
902 5
    try:
903 5
        root = ElementTree.fromstring(reqtext)
904 5
    except ElementTree.ParseError:
905 5
        packtext = "SR not in system"
906
    else:
907 5
        packtext = sr_lookup_extractor(root)
908 5
    return packtext
909
910
911 5
def sr_lookup_extractor(root):
912
    """
913
    Take an ElementTree and extract a software release from it.
914
915
    :param root: ElementTree we're barking up.
916
    :type root: xml.etree.ElementTree.ElementTree
917
    """
918 5
    reg = re.compile(r"(\d{1,4}\.)(\d{1,4}\.)(\d{1,4}\.)(\d{1,4})")
919 5
    packages = root.findall('./data/content/')
920 5
    for package in packages:
921 5
        if package.text is not None:
922 5
            match = reg.match(package.text)
923 5
            packtext = package.text if match else "SR not in system"
924 5
            return packtext
925
926
927 5
def sr_lookup_bootstrap(osv, session=None, no2=False):
928
    """
929
    Run lookups for each server for given OS.
930
931
    :param osv: OS to check.
932
    :type osv: str
933
934
    :param session: Requests session object, default is created on the fly.
935
    :type session: requests.Session()
936
937
    :param no2: Whether to skip Alpha2/Beta2 servers. Default is false.
938
    :type no2: bool
939
    """
940 5
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as xec:
941 5
        try:
942 5
            results = {
943
                "p": None,
944
                "a1": None,
945
                "a2": None,
946
                "b1": None,
947
                "b2": None
948
            }
949 5
            if no2:
950 5
                del results["a2"]
951 5
                del results["b2"]
952 5
            for key in results:
953 5
                results[key] = xec.submit(sr_lookup, osv, SERVERS[key], session).result()
954 5
            return results
955 5
        except KeyboardInterrupt:
956 5
            xec.shutdown(wait=False)
957
958
959 5
@pem_wrapper
960 5
def available_bundle_lookup(mcc, mnc, device, session=None):
961
    """
962
    Check which software releases were ever released for a carrier.
963
964
    :param mcc: Country code.
965
    :type mcc: int
966
967
    :param mnc: Network code.
968
    :type mnc: int
969
970
    :param device: Hexadecimal hardware ID.
971
    :type device: str
972
973
    :param session: Requests session object, default is created on the fly.
974
    :type session: requests.Session()
975
    """
976 5
    session = generic_session(session)
977 5
    server = "https://cs.sl.blackberry.com/cse/availableBundles/1.0.0/"
978 5
    npc = return_npc(mcc, mnc)
979 5
    query = '<?xml version="1.0" encoding="UTF-8"?>'
980 5
    query += '<availableBundlesRequest version="1.0.0" '
981 5
    query += 'authEchoTS="1366644680359">'
982 5
    query += '<deviceId><pin>0x2FFFFFB3</pin></deviceId>'
983 5
    query += '<clientProperties><hardware><id>0x{0}</id>'.format(device)
984 5
    query += '<isBootROMSecure>true</isBootROMSecure></hardware>'
985 5
    query += '<network><vendorId>0x0</vendorId><homeNPC>0x{0}</homeNPC>'.format(npc)
986 5
    query += '<currentNPC>0x{0}</currentNPC></network><software>'.format(npc)
987 5
    query += '<currentLocale>en_US</currentLocale>'
988 5
    query += '<legalLocale>en_US</legalLocale>'
989 5
    query += '<osVersion>10.0.0.0</osVersion>'
990 5
    query += '<radioVersion>10.0.0.0</radioVersion></software>'
991 5
    query += '</clientProperties><updateDirectives><bundleVersionFilter>'
992 5
    query += '</bundleVersionFilter></updateDirectives>'
993 5
    query += '</availableBundlesRequest>'
994 5
    header = {"Content-Type": "text/xml;charset=UTF-8"}
995 5
    req = session.post(server, headers=header, data=query)
996 5
    root = ElementTree.fromstring(req.text)
997 5
    package = root.find('./data/content')
998 5
    bundlelist = [child.attrib["version"] for child in package]
999 5
    return bundlelist
1000
1001
1002 5
@pem_wrapper
1003 5
def ptcrb_scraper(ptcrbid, session=None):
1004
    """
1005
    Get the PTCRB results for a given device.
1006
1007
    :param ptcrbid: Numerical ID from PTCRB (end of URL).
1008
    :type ptcrbid: str
1009
1010
    :param session: Requests session object, default is created on the fly.
1011
    :type session: requests.Session()
1012
    """
1013 5
    baseurl = "https://www.ptcrb.com/certified-devices/device-details/?model={0}".format(ptcrbid)
1014 5
    sess = generic_session(session)
1015 5
    useragent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.79 Safari/537.36"
1016 5
    sess.headers.update({"User-agent": useragent})
1017 5
    soup = generic_soup_parser(baseurl, sess)
1018 5
    certtable = soup.find_all("table")[1]
1019 5
    tds = certtable.find_all("td")[1::2]  # every other
1020 5
    prelimlist = [tdx.text for tdx in tds]
1021 5
    cleanlist = [ptcrb_item_cleaner(item.strip()) for item in prelimlist]
1022 5
    return cleanlist
1023
1024
1025 5
def space_pad(instring, minlength):
1026
    """
1027
    Pad a string with spaces until it's the minimum length.
1028
1029
    :param instring: String to pad.
1030
    :type instring: str
1031
1032
    :param minlength: Pad while len(instring) < minlength.
1033
    :type minlength: int
1034
    """
1035 5
    while len(instring) < minlength:
1036 5
        instring += " "
1037 5
    return instring
1038
1039
1040 5
def ptcrb_cleaner_multios(item):
1041
    """
1042
    Discard multiple entries for "OS".
1043
1044
    :param item: The item to clean.
1045
    :type item: str
1046
    """
1047 5
    if item.count("OS") > 1:
1048 5
        templist = item.split("OS")
1049 5
        templist[0] = "OS"
1050 5
        item = "".join([templist[0], templist[1]])
1051 5
    return item
1052
1053
1054 5
def ptcrb_cleaner_spaces(item):
1055
    """
1056
    Pad item with spaces to the right length.
1057
1058
    :param item: The item to clean.
1059
    :type item: str
1060
    """
1061 5
    spaclist = item.split(" ")
1062 5
    if len(spaclist) > 1:
1063 5
        spaclist[1] = space_pad(spaclist[1], 11)
1064 5
    if len(spaclist) > 3:
1065 5
        spaclist[3] = space_pad(spaclist[3], 11)
1066 5
    item = " ".join(spaclist)
1067 5
    return item
1068
1069
1070 5
def ptcrb_item_cleaner(item):
1071
    """
1072
    Cleanup poorly formatted PTCRB entries written by an intern.
1073
1074
    :param item: The item to clean.
1075
    :type item: str
1076
    """
1077 5
    item = item.replace("<td>", "")
1078 5
    item = item.replace("</td>", "")
1079 5
    item = item.replace("\n", "")
1080 5
    item = item.replace("SW: OS", "OS")
1081 5
    item = item.replace("Software Version: OS", "OS")
1082 5
    item = item.replace(" (SR", ", SR")
1083 5
    item = re.sub(r"\s?\((.*)$", "", item)
1084 5
    item = re.sub(r"\sSV.*$", "", item)
1085 5
    item = item.replace(")", "")
1086 5
    item = item.replace(". ", ".")
1087 5
    item = item.replace(";", "")
1088 5
    item = item.replace("version", "Version")
1089 5
    item = item.replace("Verison", "Version")
1090 5
    item = ptcrb_cleaner_multios(item)
1091 5
    item = item.replace("SR10", "SR 10")
1092 5
    item = item.replace("SR", "SW Release")
1093 5
    item = item.replace(" Version:", ":")
1094 5
    item = item.replace("Version ", " ")
1095 5
    item = item.replace(":1", ": 1")
1096 5
    item = item.replace(", ", " ")
1097 5
    item = item.replace(",", " ")
1098 5
    item = item.replace("Software", "SW")
1099 5
    item = item.replace("  ", " ")
1100 5
    item = item.replace("OS ", "OS: ")
1101 5
    item = item.replace("Radio ", "Radio: ")
1102 5
    item = item.replace("Release ", "Release: ")
1103 5
    item = ptcrb_cleaner_spaces(item)
1104 5
    item = item.strip()
1105 5
    item = item.replace("\r", "")
1106 5
    if item.startswith("10"):
1107 5
        item = "OS: {0}".format(item)
1108 5
    item = item.replace(":    ", ": ")
1109 5
    item = item.replace(":   ", ": ")
1110 5
    return item
1111
1112
1113 5
@pem_wrapper
1114 5
def kernel_scraper(utils=False, session=None):
1115
    """
1116
    Scrape BlackBerry's GitHub kernel repo for available branches.
1117
1118
    :param utils: Check android-utils repo instead of android-linux-kernel. Default is False.
1119
    :type utils: bool
1120
1121
    :param session: Requests session object, default is created on the fly.
1122
    :type session: requests.Session()
1123
    """
1124 5
    repo = "android-utils" if utils else "android-linux-kernel"
1125 5
    kernlist = []
1126 5
    sess = generic_session(session)
1127 5
    for page in range(1, 10):
1128 5
        url = "https://github.com/blackberry/{0}/branches/all?page={1}".format(repo, page)
1129 5
        soup = generic_soup_parser(url, sess)
1130 5
        if soup.find("div", {"class": "no-results-message"}):
1131 5
            break
1132
        else:
1133 5
            text = soup.get_text()
1134 5
            kernlist.extend(re.findall(r"msm[0-9]{4}\/[A-Z0-9]{6}", text, re.IGNORECASE))
1135 5
    return kernlist
1136
1137
1138 5
def root_generator(folder, build, variant="common"):
1139
    """
1140
    Generate roots for the SHAxxx hash lookup URLs.
1141
1142
    :param folder: Dictionary of variant: loader name pairs.
1143
    :type folder: dict(str: str)
1144
1145
    :param build: Build to check, 3 letters + 3 numbers.
1146
    :type build: str
1147
1148
    :param variant: Autoloader variant. Default is "common".
1149
    :type variant: str
1150
    """
1151
    #Priv specific
1152 5
    privx = "bbfoundation/hashfiles_priv/{0}".format(folder[variant])
1153
    #DTEK50 specific
1154 5
    dtek50x = "bbSupport/DTEK50" if build[:3] == "AAF" else "bbfoundation/hashfiles_priv/dtek50"
1155
    #DTEK60 specific
1156 5
    dtek60x = dtek50x  # still uses dtek50 folder, for some reason
1157
    #Pack it up
1158 5
    roots = {"Priv": privx, "DTEK50": dtek50x, "DTEK60": dtek60x}
1159 5
    return roots
1160
1161
1162 5
def make_droid_skeleton_bbm(method, build, device, variant="common"):
1163
    """
1164
    Make an Android autoloader/hash URL, on the BB Mobile site.
1165
1166
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
1167
    :type method: str
1168
1169
    :param build: Build to check, 3 letters + 3 numbers.
1170
    :type build: str
1171
1172
    :param device: Device to check.
1173
    :type device: str
1174
1175
    :param variant: Autoloader variant. Default is "common".
1176
    :type variant: str
1177
    """
1178 5
    devices = {"KEYone": "qc8953", "Motion": "qc8953"}
1179 5
    base = "bbry_{2}_autoloader_user-{0}-{1}".format(variant, build.upper(), devices[device])
1180 5
    if method is None:
1181 5
        skel = "http://54.247.87.13/softwareupgrade/BBM/{0}.zip".format(base)
1182
    else:
1183 5
        skel = "http://54.247.87.13/softwareupgrade/BBM/{0}.{1}sum".format(base, method.lower())
1184 5
    return skel
1185
1186
1187 5
def make_droid_skeleton_og(method, build, device, variant="common"):
1188
    """
1189
    Make an Android autoloader/hash URL, on the original site.
1190
1191
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
1192
    :type method: str
1193
1194
    :param build: Build to check, 3 letters + 3 numbers.
1195
    :type build: str
1196
1197
    :param device: Device to check.
1198
    :type device: str
1199
1200
    :param variant: Autoloader variant. Default is "common".
1201
    :type variant: str
1202
    """
1203 5
    folder = {"vzw-vzw": "verizon", "na-att": "att", "na-tmo": "tmo", "common": "default"}
1204 5
    devices = {"Priv": "qc8992", "DTEK50": "qc8952_64_sfi", "DTEK60": "qc8996"}
1205 5
    roots = root_generator(folder, build, variant)
1206 5
    base = "bbry_{2}_autoloader_user-{0}-{1}".format(variant, build.upper(), devices[device])
1207 5
    if method is None:
1208 5
        baseurl = "https://bbapps.download.blackberry.com/Priv"
1209 5
        skel = "{1}/{0}.zip".format(base, baseurl)
1210
    else:
1211 5
        baseurl = "https://ca.blackberry.com/content/dam"
1212 5
        skel = "{3}/{1}/{0}.{2}sum".format(base, roots[device], method.lower(), baseurl)
1213 5
    return skel
1214
1215
1216 5
def make_droid_skeleton(method, build, device, variant="common"):
1217
    """
1218
    Make an Android autoloader/hash URL.
1219
1220
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
1221
    :type method: str
1222
1223
    :param build: Build to check, 3 letters + 3 numbers.
1224
    :type build: str
1225
1226
    :param device: Device to check.
1227
    :type device: str
1228
1229
    :param variant: Autoloader variant. Default is "common".
1230
    :type variant: str
1231
    """
1232
    # No Aurora
1233 5
    oglist = ("Priv", "DTEK50", "DTEK60")  # BlackBerry
1234 5
    bbmlist = ("KEYone", "Motion")   # BB Mobile
1235 5
    if device in oglist:
1236 5
        skel = make_droid_skeleton_og(method, build, device, variant)
1237 5
    elif device in bbmlist:
1238 5
        skel = make_droid_skeleton_bbm(method, build, device, variant)
1239 5
    return skel
1240
1241
1242 5
def bulk_droid_skeletons(devs, build, method=None):
1243
    """
1244
    Prepare list of Android autoloader/hash URLs.
1245
1246
    :param devs: List of devices.
1247
    :type devs: list(str)
1248
1249
    :param build: Build to check, 3 letters + 3 numbers.
1250
    :type build: str
1251
1252
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
1253
    :type method: str
1254
    """
1255 5
    carrier_variants = {
1256
        "Priv": ("common", "vzw-vzw", "na-tmo", "na-att"),
1257
        "KEYone": ("common", "usa-sprint", "global-att", "china-china")
1258
    }
1259 5
    common_variants = ("common", )  # for single-variant devices
1260 5
    carrier_devices = ("Priv", )  # add KEYone when verified
1261 5
    skels = []
1262 5
    for dev in devs:
1263 5
        varlist = carrier_variants[dev] if dev in carrier_devices else common_variants
1264 5
        for var in varlist:
1265 5
            skel = make_droid_skeleton(method, build, dev, var)
1266 5
            skels.append(skel)
1267 5
    return skels
1268
1269
1270 5
def prepare_droid_list(device):
1271
    """
1272
    Convert single devices to a list, if necessary.
1273
1274
    :param device: Device to check.
1275
    :type device: str
1276
    """
1277 5
    if isinstance(device, list):
1278 5
        devs = device
1279
    else:
1280 5
        devs = [device]
1281 5
    return devs
1282
1283
1284 5
def droid_scanner(build, device, method=None, session=None):
1285
    """
1286
    Check for Android autoloaders on BlackBerry's site.
1287
1288
    :param build: Build to check, 3 letters + 3 numbers.
1289
    :type build: str
1290
1291
    :param device: Device to check.
1292
    :type device: str
1293
1294
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
1295
    :type method: str
1296
1297
    :param session: Requests session object, default is created on the fly.
1298
    :type session: requests.Session()
1299
    """
1300 5
    devs = prepare_droid_list(device)
1301 5
    skels = bulk_droid_skeletons(devs, build, method)
1302 5
    with concurrent.futures.ThreadPoolExecutor(max_workers=len(skels)) as xec:
1303 5
        results = droid_scanner_worker(xec, skels, session)
1304 5
    return results if results else None
1305
1306
1307 5
def droid_scanner_worker(xec, skels, session=None):
1308
    """
1309
    Worker to check for Android autoloaders.
1310
1311
    :param xec: ThreadPoolExecutor instance.
1312
    :type xec: concurrent.futures.ThreadPoolExecutor
1313
1314
    :param skels: List of skeleton formats.
1315
    :type skels: list(str)
1316
1317
    :param session: Requests session object, default is created on the fly.
1318
    :type session: requests.Session()
1319
    """
1320 5
    results = []
1321 5
    for skel in skels:
1322 5
        avail = xec.submit(availability, skel, session)
1323 5
        if avail.result():
1324 5
            results.append(skel)
1325 5
    return results
1326
1327
1328 5
def chunker(iterable, inc):
1329
    """
1330
    Convert an iterable into a list of inc sized lists.
1331
1332
    :param iterable: Iterable to chunk.
1333
    :type iterable: list/tuple/string
1334
1335
    :param inc: Increment; how big each chunk is.
1336
    :type inc: int
1337
    """
1338 5
    chunks = [iterable[x:x+inc] for x in range(0, len(iterable), inc)]
1339 5
    return chunks
1340
1341
1342 5
def unicode_filter(intext):
1343
    """
1344
    Remove Unicode crap.
1345
1346
    :param intext: Text to filter.
1347
    :type intext: str
1348
    """
1349 5
    return intext.replace("\u2013", "").strip()
1350
1351
1352 5
def table_header_filter(ptag):
1353
    """
1354
    Validate p tag, to see if it's relevant.
1355
1356
    :param ptag: P tag.
1357
    :type ptag: bs4.element.Tag
1358
    """
1359 5
    valid = ptag.find("b") and "BlackBerry" in ptag.text and not "experts" in ptag.text
1360 5
    return valid
1361
1362
1363 5
def table_headers(pees):
1364
    """
1365
    Generate table headers from list of p tags.
1366
1367
    :param pees: List of p tags.
1368
    :type pees: list(bs4.element.Tag)
1369
    """
1370 5
    bolds = [x.text for x in pees if table_header_filter(x)]
1371 5
    return bolds
1372
1373
1374 5
@pem_wrapper
1375 5
def loader_page_scraper(session=None):
1376
    """
1377
    Return scraped autoloader pages.
1378
1379
    :param session: Requests session object, default is created on the fly.
1380
    :type session: requests.Session()
1381
    """
1382 5
    session = generic_session(session)
1383 5
    loader_page_scraper_og(session)
1384 5
    loader_page_scraper_bbm(session)
1385
1386
1387 5
def loader_page_scraper_og(session=None):
1388
    """
1389
    Return scraped autoloader page, original site.
1390
1391
    :param session: Requests session object, default is created on the fly.
1392
    :type session: requests.Session()
1393
    """
1394 5
    url = "https://ca.blackberry.com/support/smartphones/Android-OS-Reload.html"
1395 5
    soup = generic_soup_parser(url, session)
1396 5
    tables = soup.find_all("table")
1397 5
    headers = table_headers(soup.find_all("p"))
1398 5
    for idx, table in enumerate(tables):
1399 5
        loader_page_chunker_og(idx, table, headers)
1400
1401
1402 5
def loader_page_scraper_bbm(session=None):
1403
    """
1404
    Return scraped autoloader page, new site.
1405
1406
    :param session: Requests session object, default is created on the fly.
1407
    :type session: requests.Session()
1408
    """
1409 5
    url = "https://www.blackberrymobile.com/support/reload-software/"
1410 5
    soup = generic_soup_parser(url, session)
1411 5
    ulls = soup.find_all("ul", {"class": re.compile("list-two special-.")})[1:]
1412 5
    print("~~~BlackBerry KEYone~~~")
1413 5
    for ull in ulls:
1414 5
        loader_page_chunker_bbm(ull)
1415
1416
1417 5
def loader_page_chunker_og(idx, table, headers):
1418
    """
1419
    Given a loader page table, chunk it into lists of table cells.
1420
1421
    :param idx: Index of enumerating tables.
1422
    :type idx: int
1423
1424
    :param table: HTML table tag.
1425
    :type table: bs4.element.Tag
1426
1427
    :param headers: List of table headers.
1428
    :type headers: list(str)
1429
    """
1430 5
    print("~~~{0}~~~".format(headers[idx]))
1431 5
    chunks = chunker(table.find_all("td"), 4)
1432 5
    for chunk in chunks:
1433 5
        loader_page_printer(chunk)
1434 5
    print(" ")
1435
1436
1437 5
def loader_page_chunker_bbm(ull):
1438
    """
1439
    Given a loader page list, chunk it into lists of list items.
1440
1441
    :param ull: HTML unordered list tag.
1442
    :type ull: bs4.element.Tag
1443
    """
1444 5
    chunks = chunker(ull.find_all("li"), 3)
1445 5
    for chunk in chunks:
1446 5
        loader_page_printer(chunk)
1447
1448
1449 5
def loader_page_printer(chunk):
1450
    """
1451
    Print individual cell texts given a list of table cells.
1452
1453
    :param chunk: List of td tags.
1454
    :type chunk: list(bs4.element.Tag)
1455
    """
1456 5
    key = unicode_filter(chunk[0].text)
1457 5
    ver = unicode_filter(chunk[1].text)
1458 5
    link = unicode_filter(chunk[2].find("a")["href"])
1459 5
    print("{0}\n    {1}: {2}".format(key, ver, link))
1460
1461
1462 5
@pem_wrapper
1463 5
def base_metadata(url, session=None):
1464
    """
1465
    Get BBNDK metadata, base function.
1466
1467
    :param url: URL to check.
1468
    :type url: str
1469
1470
    :param session: Requests session object, default is created on the fly.
1471
    :type session: requests.Session()
1472
    """
1473 5
    session = generic_session(session)
1474 5
    req = session.get(url)
1475 5
    data = req.content
1476 5
    entries = data.split(b"\n")
1477 5
    metadata = [entry.split(b",")[1].decode("utf-8") for entry in entries if entry]
1478 5
    return metadata
1479
1480
1481 5
def base_metadata_url(alternate=None):
1482
    """
1483
    Return metadata URL.
1484
1485
    :param alternate: If the URL is for the simulator metadata. Default is False.
1486
    :type alternate: str
1487
    """
1488 5
    baseurl = "http://downloads.blackberry.com/upr/developers/update/bbndk"
1489 5
    tail = "{0}/{0}_metadata".format(alternate) if alternate is not None else "metadata"
1490 5
    return "{0}/{1}".format(baseurl, tail)
1491
1492
1493 5
def ndk_metadata(session=None):
1494
    """
1495
    Get BBNDK target metadata.
1496
1497
    :param session: Requests session object, default is created on the fly.
1498
    :type session: requests.Session()
1499
    """
1500 5
    ndkurl = base_metadata_url()
1501 5
    data = base_metadata(ndkurl, session)
1502 5
    metadata = [entry for entry in data if entry.startswith(("10.0", "10.1", "10.2"))]
1503 5
    return metadata
1504
1505
1506 5
def sim_metadata(session=None):
1507
    """
1508
    Get BBNDK simulator metadata.
1509
1510
    :param session: Requests session object, default is created on the fly.
1511
    :type session: requests.Session()
1512
    """
1513 5
    simurl = base_metadata_url("simulator")
1514 5
    metadata = base_metadata(simurl, session)
1515 5
    return metadata
1516
1517
1518 5
def runtime_metadata(session=None):
1519
    """
1520
    Get BBNDK runtime metadata.
1521
1522
    :param session: Requests session object, default is created on the fly.
1523
    :type session: requests.Session()
1524
    """
1525 5
    rturl = base_metadata_url("runtime")
1526 5
    metadata = base_metadata(rturl, session)
1527 5
    return metadata
1528
1529
1530 5
def series_generator(osversion):
1531
    """
1532
    Generate series/branch name from OS version.
1533
1534
    :param osversion: OS version.
1535
    :type osversion: str
1536
    """
1537 5
    splits = osversion.split(".")
1538 5
    return "BB{0}_{1}_{2}".format(*splits[0:3])
1539
1540
1541 5
@pem_wrapper
1542 5
def devalpha_urls(osversion, skel, session=None):
1543
    """
1544
    Check individual Dev Alpha autoloader URLs.
1545
1546
    :param osversion: OS version.
1547
    :type osversion: str
1548
1549
    :param skel: Individual skeleton format to try.
1550
    :type skel: str
1551
1552
    :param session: Requests session object, default is created on the fly.
1553
    :type session: requests.Session()
1554
    """
1555 5
    session = generic_session(session)
1556 5
    baseurl = "http://downloads.blackberry.com/upr/developers/downloads"
1557 5
    url = "{2}/{0}{1}.exe".format(skel, osversion, baseurl)
1558 5
    req = session.head(url)
1559 5
    if req.status_code == 200:
1560 5
        finals = (url, req.headers["content-length"])
1561
    else:
1562 5
        finals = ()
1563 5
    return finals
1564
1565
1566 5
def devalpha_urls_serieshandler(osversion, skeletons):
1567
    """
1568
    Process list of candidate Dev Alpha autoloader URLs.
1569
1570
    :param osversion: OS version.
1571
    :type osversion: str
1572
1573
    :param skeletons: List of skeleton formats to try.
1574
    :type skeletons: list
1575
    """
1576 5
    skels = skeletons
1577 5
    for idx, skel in enumerate(skeletons):
1578 5
        if "<SERIES>" in skel:
1579 5
            skels[idx] = skel.replace("<SERIES>", series_generator(osversion))
1580 5
    return skels
1581
1582
1583 5
def devalpha_urls_bulk(osversion, skeletons, xec, session=None):
1584
    """
1585
    Construct list of valid Dev Alpha autoloader URLs.
1586
1587
    :param osversion: OS version.
1588
    :type osversion: str
1589
1590
    :param skeletons: List of skeleton formats to try.
1591
    :type skeletons: list
1592
1593
    :param xec: ThreadPoolExecutor instance.
1594
    :type xec: concurrent.futures.ThreadPoolExecutor
1595
1596
    :param session: Requests session object, default is created on the fly.
1597
    :type session: requests.Session()
1598
    """
1599 5
    finals = {}
1600 5
    skels = devalpha_urls_serieshandler(osversion, skeletons)
1601 5
    for skel in skels:
1602 5
        final = xec.submit(devalpha_urls, osversion, skel, session).result()
1603 5
        if final:
1604 5
            finals[final[0]] = final[1]
1605 5
    return finals
1606
1607
1608 5
def devalpha_urls_bootstrap(osversion, skeletons, session=None):
1609
    """
1610
    Get list of valid Dev Alpha autoloader URLs.
1611
1612
    :param osversion: OS version.
1613
    :type osversion: str
1614
1615
    :param skeletons: List of skeleton formats to try.
1616
    :type skeletons: list
1617
1618
    :param session: Requests session object, default is created on the fly.
1619
    :type session: requests.Session()
1620
    """
1621 5
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as xec:
1622 5
        try:
1623 5
            return devalpha_urls_bulk(osversion, skeletons, xec, session)
1624 5
        except KeyboardInterrupt:
1625 5
            xec.shutdown(wait=False)
1626
1627
1628 5
def dev_dupe_dicter(finals):
1629
    """
1630
    Prepare dictionary to clean duplicate autoloaders.
1631
1632
    :param finals: Dict of URL:content-length pairs.
1633
    :type finals: dict(str: str)
1634
    """
1635 5
    revo = {}
1636 5
    for key, val in finals.items():
1637 5
        revo.setdefault(val, set()).add(key)
1638 5
    return revo
1639
1640
1641 5
def dev_dupe_remover(finals, dupelist):
1642
    """
1643
    Filter dictionary of autoloader entries.
1644
1645
    :param finals: Dict of URL:content-length pairs.
1646
    :type finals: dict(str: str)
1647
1648
    :param dupelist: List of duplicate URLs.
1649
    :type duplist: list(str)
1650
    """
1651 5
    for dupe in dupelist:
1652 5
        for entry in dupe:
1653 5
            if "DevAlpha" in entry:
1654 5
                del finals[entry]
1655 5
    return finals
1656
1657
1658 5
def dev_dupe_cleaner(finals):
1659
    """
1660
    Clean duplicate autoloader entries.
1661
1662
    :param finals: Dict of URL:content-length pairs.
1663
    :type finals: dict(str: str)
1664
    """
1665 5
    revo = dev_dupe_dicter(finals)
1666 5
    dupelist = [val for key, val in revo.items() if len(val) > 1]
1667 5
    finals = dev_dupe_remover(finals, dupelist)
1668
    return finals
1669