Passed
Branch master (604dcd)
by John
02:16
created

bbarchivist.networkutils   F

Complexity

Total Complexity 192

Size/Duplication

Total Lines 1669
Duplicated Lines 0 %

Test Coverage

Coverage 99.7%

Importance

Changes 0
Metric Value
eloc 679
dl 0
loc 1669
ccs 668
cts 670
cp 0.997
rs 0.6314
c 0
b 0
f 0
wmc 192

80 Functions

Rating   Name   Duplication   Size   Complexity  
B download_bootstrap() 0 29 5
A grab_pem() 0 10 3
B ptcrb_item_cleaner() 0 41 2
A loader_page_scraper_bbm() 0 13 2
A remove_empty_download() 0 9 2
B bulk_droid_skeletons() 0 26 4
A table_header_filter() 0 9 1
B download_writer() 0 29 4
A droid_scanner() 0 21 3
B make_droid_skeleton() 0 24 3
A droid_scanner_worker() 0 19 3
A get_length() 0 20 3
A pem_wrapper() 0 14 1
A base_metadata_url() 0 10 2
A make_droid_skeleton_bbm() 0 23 2
A prepare_droid_list() 0 12 2
A chunker() 0 12 2
A download() 0 21 1
A space_pad() 0 13 2
A ptcrb_cleaner_spaces() 0 14 3
A root_generator() 0 22 2
A loader_page_chunker_og() 0 18 2
B try_try_again() 0 23 5
A generic_session() 0 9 2
A generic_soup_parser() 0 14 1
A loader_page_chunker_bbm() 0 10 2
A ptcrb_cleaner_multios() 0 12 2
A unicode_filter() 0 8 1
A loader_page_scraper_og() 0 13 2
A devalpha_urls_bulk() 0 23 3
A tcl_master() 0 5 1
A table_headers() 0 9 3
A dump_tcl_xml() 0 12 3
A encrypt_header() 0 23 3
B kernel_scraper() 0 23 4
A loader_page_printer() 0 11 1
A encrypt_header_prep() 0 15 2
B vkhash() 0 34 1
A series_generator() 0 9 1
A getcode() 0 18 2
A ptcrb_scraper() 0 21 3
B download_request_prep() 0 37 2
B tcl_check() 0 32 3
A dev_dupe_remover() 0 15 4
A availability() 0 14 1
A unpack_vdkey() 0 7 1
B parse_carrier_xml() 0 22 4
A parse_tcl_download_request() 0 17 2
A remote_prd_info() 0 10 3
A devalpha_urls() 0 23 2
B tcl_download_request() 0 44 3
A parse_tcl_check() 0 15 1
A carrier_checker() 0 22 1
B make_droid_skeleton_og() 0 27 2
A cchecker_get_tags() 0 13 4
A tcl_default_id() 0 10 2
B carrier_child_finder() 0 26 5
A loader_page_scraper() 0 11 1
A sim_metadata() 0 10 1
A dev_dupe_cleaner() 0 11 3
A download_android_tools() 0 17 4
A return_npc() 0 11 1
A devalpha_urls_bootstrap() 0 18 3
A carrier_query() 0 62 3
B check_prep() 0 32 1
A sr_lookup_extractor() 0 14 4
B sr_lookup() 0 36 1
A base_metadata() 0 17 3
A sr_lookup_xmlparser() 0 14 3
A sr_lookup_poster() 0 22 3
A tcl_salt() 0 7 1
B available_bundle_lookup() 0 41 2
A devalpha_urls_serieshandler() 0 15 3
A carrier_child_fileappend() 0 22 3
A ndk_metadata() 0 11 3
A carrier_swver_get() 0 10 2
B sr_lookup_bootstrap() 0 30 5
A runtime_metadata() 0 10 1
A clean_availability() 0 14 3
A dev_dupe_dicter() 0 11 2

How to fix   Complexity   

Complexity

Complex classes like bbarchivist.networkutils often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python3
0 ignored issues
show
coding-style introduced by
Too many lines in module (1668/1000)
Loading history...
2 4
"""This module is used for network connections; APIs, downloading, etc."""
3
4 5
import base64  # encoding
5 5
import binascii  # encoding
6 5
import concurrent.futures  # multiprocessing/threading
7 5
import glob  # pem file lookup
8 5
import hashlib  # salt
9 5
import os  # filesystem read
10 5
import random  # salt
11 5
import re  # regexes
12 5
import time  # salt
13 5
import zlib  # encoding
14
15 5
import requests  # downloading
16 5
from bs4 import BeautifulSoup  # scraping
17 5
from bbarchivist import utilities  # parse filesize
18 5
from bbarchivist.bbconstants import SERVERS, TCLMASTERS  # lookup servers
19
20 5
try:
21 5
    from defusedxml import ElementTree  # safer XML parsing
22
except (ImportError, AttributeError):
23
    from xml.etree import ElementTree  # XML parsing
24
25 5
__author__ = "Thurask"
26 5
__license__ = "WTFPL v2"
27 5
__copyright__ = "2015-2018 Thurask"
28
29
30 5
def grab_pem():
31
    """
32
    Work with either local cacerts or system cacerts.
33
    """
34 5
    try:
35 5
        pemfile = glob.glob(os.path.join(os.getcwd(), "cacert.pem"))[0]
36 5
    except IndexError:
37 5
        return requests.certs.where()  # no local cacerts
38
    else:
39 5
        return os.path.abspath(pemfile)  # local cacerts
40
41
42 5
def pem_wrapper(method):
43
    """
44
    Decorator to set REQUESTS_CA_BUNDLE.
45
46
    :param method: Method to use.
47
    :type method: function
48
    """
49 5
    def wrapper(*args, **kwargs):
50
        """
51
        Set REQUESTS_CA_BUNDLE before doing function.
52
        """
53 5
        os.environ["REQUESTS_CA_BUNDLE"] = grab_pem()
54 5
        return method(*args, **kwargs)
55 5
    return wrapper
56
57
58 5
def try_try_again(method):
59
    """
60
    Decorator to absorb timeouts, proxy errors, and other common exceptions.
61
62
    :param method: Method to use.
63
    :type method: function
64
    """
65 5
    def wrapper(*args, **kwargs):
66
        """
67
        Try function, try it again up to five times, and leave gracefully.
68
        """
69 5
        tries = 5
70 5
        for _ in range(tries):
71 5
            try:
72 5
                result = method(*args, **kwargs)
73 5
            except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, requests.exceptions.ProxyError):
74 5
                continue
75
            else:
76 5
                break
77
        else:
78 5
            result = None
79 5
        return result
80 5
    return wrapper
81
82
83 5
def generic_session(session=None):
84
    """
85
    Create a Requests session object on the fly, if need be.
86
87
    :param session: Requests session object, created if this is None.
88
    :type session: requests.Session()
89
    """
90 5
    sess = requests.Session() if session is None else session
91 5
    return sess
92
93
94 5
def generic_soup_parser(url, session=None):
95
    """
96
    Get a BeautifulSoup HTML parser for some URL.
97
98
    :param url: The URL to check.
99
    :type url: str
100
101
    :param session: Requests session object, default is created on the fly.
102
    :type session: requests.Session()
103
    """
104 5
    session = generic_session(session)
105 5
    req = session.get(url)
106 5
    soup = BeautifulSoup(req.content, "html.parser")
107 5
    return soup
108
109
110 5
@pem_wrapper
111 5
def get_length(url, session=None):
112
    """
113
    Get content-length header from some URL.
114
115
    :param url: The URL to check.
116
    :type url: str
117
118
    :param session: Requests session object, default is created on the fly.
119
    :type session: requests.Session()
120
    """
121 5
    session = generic_session(session)
122 5
    if url is None:
123 5
        return 0
124 5
    try:
125 5
        heads = session.head(url)
126 5
        fsize = heads.headers['content-length']
127 5
        return int(fsize)
128 5
    except requests.ConnectionError:
129 5
        return 0
130
131
132 5
@pem_wrapper
133 5
def download(url, output_directory=None, session=None):
134
    """
135
    Download file from given URL.
136
137
    :param url: URL to download from.
138
    :type url: str
139
140
    :param output_directory: Download folder. Default is local.
141
    :type output_directory: str
142
143
    :param session: Requests session object, default is created on the fly.
144
    :type session: requests.Session()
145
    """
146 5
    session = generic_session(session)
147 5
    output_directory = utilities.dirhandler(output_directory, os.getcwd())
148 5
    lfname = url.split('/')[-1]
149 5
    sname = utilities.stripper(lfname)
150 5
    fname = os.path.join(output_directory, lfname)
151 5
    download_writer(url, fname, lfname, sname, session)
152 5
    remove_empty_download(fname)
153
154
155 5
def remove_empty_download(fname):
156
    """
157
    Remove file if it's empty.
158
159
    :param fname: File path.
160
    :type fname: str
161
    """
162 5
    if os.stat(fname).st_size == 0:
163 5
        os.remove(fname)
164
165
166 5
def download_writer(url, fname, lfname, sname, session=None):
167
    """
168
    Download file and write to disk.
169
170
    :param url: URL to download from.
171
    :type url: str
172
173
    :param fname: File path.
174
    :type fname: str
175
176
    :param lfname: Long filename.
177
    :type lfname: str
178
179
    :param sname: Short name, for printing to screen.
180
    :type sname: str
181
182
    :param session: Requests session object, default is created on the fly.
183
    :type session: requests.Session()
184
    """
185 5
    with open(fname, "wb") as file:
186 5
        req = session.get(url, stream=True)
187 5
        clength = req.headers['content-length']
188 5
        fsize = utilities.fsizer(clength)
189 5
        if req.status_code == 200:  # 200 OK
190 5
            print("DOWNLOADING {0} [{1}]".format(sname, fsize))
191 5
            for chunk in req.iter_content(chunk_size=1024):
192 5
                file.write(chunk)
193
        else:
194 5
            print("ERROR: HTTP {0} IN {1}".format(req.status_code, lfname))
195
196
197 5
def download_bootstrap(urls, outdir=None, workers=5, session=None):
198
    """
199
    Run downloaders for each file in given URL iterable.
200
201
    :param urls: URLs to download.
202
    :type urls: list
203
204
    :param outdir: Download folder. Default is handled in :func:`download`.
205
    :type outdir: str
206
207
    :param workers: Number of worker processes. Default is 5.
208
    :type workers: int
209
210
    :param session: Requests session object, default is created on the fly.
211
    :type session: requests.Session()
212
    """
213 5
    workers = len(urls) if len(urls) < workers else workers
214 5
    spinman = utilities.SpinManager()
215 5
    with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as xec:
216 5
        try:
217 5
            spinman.start()
218 5
            for url in urls:
219 5
                xec.submit(download, url, outdir, session)
220 5
        except (KeyboardInterrupt, SystemExit):
221 5
            xec.shutdown()
222 5
            spinman.stop()
223 5
    spinman.stop()
224 5
    utilities.spinner_clear()
225 5
    utilities.line_begin()
226
227
228 5
def download_android_tools(downloaddir=None):
229
    """
230
    Download Android SDK platform tools.
231
232
    :param downloaddir: Directory name, default is "plattools".
233
    :type downloaddir: str
234
    """
235 5
    if downloaddir is None:
236 5
        downloaddir = "plattools"
237 5
    if os.path.exists(downloaddir):
238 5
        os.removedirs(downloaddir)
239 5
    os.mkdir(downloaddir)
240 5
    platforms = ("windows", "linux", "darwin")
241 5
    baseurl = "https://dl.google.com/android/repository/platform-tools-latest"
242 5
    dlurls = ["{1}-{0}.zip".format(plat, baseurl) for plat in platforms]
243 5
    sess = generic_session()
244 5
    download_bootstrap(dlurls, outdir="plattools", session=sess)
245
246
247 5
@pem_wrapper
248 5
def getcode(url, session=None):
249
    """
250
    Return status code of given URL.
251
252
    :param url: URL to check.
253
    :type url: str
254
255
    :param session: Requests session object, default is created on the fly.
256
    :type session: requests.Session()
257
    """
258 5
    session = generic_session(session)
259 5
    try:
260 5
        shead = session.head(url)
261 5
        status = int(shead.status_code)
262 5
        return status
263 5
    except requests.ConnectionError:
264 5
        return 404
265
266
267 5
@pem_wrapper
268 5
def availability(url, session=None):
269
    """
270
    Check HTTP status code of given URL.
271
    200 or 301-308 is OK, else is not.
272
273
    :param url: URL to check.
274
    :type url: str
275
276
    :param session: Requests session object, default is created on the fly.
277
    :type session: requests.Session()
278
    """
279 5
    status = getcode(url, session)
280 5
    return status == 200 or 300 < status <= 308
281
282
283 5
def clean_availability(results, server):
284
    """
285
    Clean availability for autolookup script.
286
287
    :param results: Result dict.
288
    :type results: dict(str: str)
289
290
    :param server: Server, key for result dict.
291
    :type server: str
292
    """
293 5
    marker = "PD" if server == "p" else server.upper()
294 5
    rel = results[server.lower()]
295 5
    avail = marker if rel != "SR not in system" and rel is not None else "  "
296 5
    return rel, avail
297
298
299 5
def tcl_master():
300
    """
301
    Get a random master server.
302
    """
303 5
    return random.choice(TCLMASTERS)
304
305
306 5
def tcl_default_id(devid):
307
    """
308
    Get an IMEI or a serial number or something.
309
310
    :param devid: Return default if this is None.
311
    :type devid: str
312
    """
313 5
    if devid is None:
314 5
        devid = "543212345000000"
315 5
    return devid
316
317
318 5
def check_prep(curef, mode=4, fvver="AAA000", cltp=2010, cktp=2, rtd=1, chnl=2, devid=None):
319
    """
320
    Prepare variables for TCL update check.
321
322
    :param curef: PRD of the phone variant to check.
323
    :type curef: str
324
325
    :param mode: 4 if downloading autoloaders, 2 if downloading OTA deltas.
326
    :type mode: int
327
328
    :param fvver: Initial software version, must be specific if downloading OTA deltas.
329
    :type fvver: str
330
331
    :param cltp: 2010 to always show latest version, 10 to show actual updates. Default is 2010.
332
    :type cltp: int
333
334
    :param cktp: 2 if checking manually, 1 if checking automatically. Default is 2.
335
    :type cktp: int
336
337
    :param rtd: 2 if rooted, 1 if not. Default is 1.
338
    :type rtd: int
339
340
    :param chnl: 2 if checking on WiFi, 1 if checking on mobile. Default is 2.
341
    :type chnl: int
342
343
    :param devid: Serial number/IMEI. Default is fake, not that it matters.
344
    :type devid: str
345
    """
346 5
    devid = tcl_default_id(devid)
347 5
    geturl = "http://{0}/check.php".format(tcl_master())
348 5
    params = {"id": devid, "curef": curef, "fv": fvver, "mode": mode, "type": "Firmware", "cltp": cltp, "cktp": cktp, "rtd": rtd, "chnl": chnl}
349 5
    return geturl, params
350
351
352 5
@pem_wrapper
353 5
@try_try_again
354 5
def tcl_check(curef, session=None, mode=4, fvver="AAA000", export=False):
355
    """
356
    Check TCL server for updates.
357
358
    :param curef: PRD of the phone variant to check.
359
    :type curef: str
360
361
    :param session: Requests session object, default is created on the fly.
362
    :type session: requests.Session()
363
364
    :param mode: 4 if downloading autoloaders, 2 if downloading OTA deltas.
365
    :type mode: int
366
367
    :param fvver: Initial software version, must be specific if downloading OTA deltas.
368
    :type fvver: str
369
370
    :param export: Whether to export XML response to file. Default is False.
371
    :type export: bool
372
    """
373 5
    sess = generic_session(session)
374 5
    geturl, params = check_prep(curef, mode, fvver)
375 5
    req = sess.get(geturl, params=params)
376 5
    if req.status_code == 200:
377 5
        req.encoding = "utf-8"
378 5
        response = req.text
379 5
        if export:
380 5
            dump_tcl_xml(response)
381
    else:
382 5
        response = None
383 5
    return response
384
385
386 5
def parse_tcl_check(data):
387
    """
388
    Extract version and file info from TCL update server response.
389
390
    :param data: The data to parse.
391
    :type data: str
392
    """
393 5
    root = ElementTree.fromstring(data)
394 5
    tvver = root.find("VERSION").find("TV").text
395 5
    fwid = root.find("FIRMWARE").find("FW_ID").text
396 5
    fileinfo = root.find("FIRMWARE").find("FILESET").find("FILE")
397 5
    filename = fileinfo.find("FILENAME").text
398 5
    filesize = fileinfo.find("SIZE").text
399 5
    filehash = fileinfo.find("CHECKSUM").text
400 5
    return tvver, fwid, filename, filesize, filehash
401
402
403 5
def tcl_salt():
404
    """
405
    Generate salt value for TCL server tools.
406
    """
407 5
    millis = round(time.time() * 1000)
408 5
    tail = "{0:06d}".format(random.randint(0, 999999))
409 5
    return "{0}{1}".format(str(millis), tail)
410
411
412 5
def dump_tcl_xml(xmldata):
413
    """
414
    Write XML responses to output directory.
415
416
    :param xmldata: Response XML.
417
    :type xmldata: str
418
    """
419 5
    outfile = os.path.join(os.getcwd(), "logs", "{0}.xml".format(tcl_salt()))
420 5
    if not os.path.exists(os.path.dirname(outfile)):
421 5
        os.makedirs(os.path.dirname(outfile))
422 5
    with open(outfile, "w", encoding="utf-8") as afile:
423 5
        afile.write(xmldata)
424
425
426 5
def unpack_vdkey():
427
    """
428
    Draw the curtain back.
429
    """
430 5
    vdkey = b"eJwdjwEOwDAIAr8kKFr//7HhmqXp8AIIDrYAgg8byiUXrwRJRXja+d6iNxu0AhUooDCN9rd6rDLxmGIakUVWo3IGCTRWqCAt6X4jGEIUAxgN0eYWnp+LkpHQAg/PsO90ELsy0Npm/n2HbtPndFgGEV31R9OmT4O4nrddjc3Qt6nWscx7e+WRHq5UnOudtjw5skuV09pFhvmqnOEIs4ljPeel1wfLYUF4\n"
431 5
    vdk = zlib.decompress(binascii.a2b_base64(vdkey))
432 5
    return vdk.decode("utf-8")
433
434
435 5
def vkhash(curef, tvver, fwid, salt, mode=4, fvver="AAA000", cltp=2010, devid=None):
436
    """
437
    Generate hash from TCL update server variables.
438
439
    :param curef: PRD of the phone variant to check.
440
    :type curef: str
441
442
    :param tvver: Target software version.
443
    :type tvver: str
444
445
    :param fwid: Firmware ID for desired download file.
446
    :type fwid: str
447
448
    :param salt: Salt hash.
449
    :type salt: str
450
451
    :param mode: 4 if downloading autoloaders, 2 if downloading OTA deltas.
452
    :type mode: int
453
454
    :param fvver: Initial software version, must be specific if downloading OTA deltas.
455
    :type fvver: str
456
457
    :param cltp: 2010 to always show latest version, 10 to show actual updates. Default is 2010.
458
    :type cltp: int
459
460
    :param devid: Serial number/IMEI. Default is fake, not that it matters.
461
    :type devid: str
462
    """
463 5
    vdk = unpack_vdkey()
464 5
    devid = tcl_default_id(devid)
465 5
    query = "id={0}&salt={1}&curef={2}&fv={3}&tv={4}&type={5}&fw_id={6}&mode={7}&cltp={8}{9}".format(devid, salt, curef, fvver, tvver, "Firmware", fwid, mode, cltp, vdk)
466 5
    engine = hashlib.sha1()
467 5
    engine.update(bytes(query, "utf-8"))
468 5
    return engine.hexdigest()
469
470
471 5
def download_request_prep(curef, tvver, fwid, salt, vkh, mode=4, fvver="AAA000", cltp=2010, devid=None):
472
    """
473
    Prepare variables for download server check.
474
475
    :param curef: PRD of the phone variant to check.
476
    :type curef: str
477
478
    :param tvver: Target software version.
479
    :type tvver: str
480
481
    :param fwid: Firmware ID for desired download file.
482
    :type fwid: str
483
484
    :param salt: Salt hash.
485
    :type salt: str
486
487
    :param vkh: VDKey-based hash.
488
    :type vkh: str
489
490
    :param mode: 4 if downloading autoloaders, 2 if downloading OTA deltas.
491
    :type mode: int
492
493
    :param fvver: Initial software version, must be specific if downloading OTA deltas.
494
    :type fvver: str
495
496
    :param cltp: 2010 to always show latest version, 10 to show actual updates. Default is 2010.
497
    :type cltp: int
498
499
    :param devid: Serial number/IMEI. Default is fake, not that it matters.
500
    :type devid: str
501
    """
502 5
    devid = tcl_default_id(devid)
503 5
    posturl = "http://{0}/download_request.php".format(tcl_master())
504 5
    params = {"id": devid, "curef": curef, "fv": fvver, "mode": mode, "type": "Firmware", "tv": tvver, "fw_id": fwid, "salt": salt, "vk": vkh, "cltp": cltp}
505 5
    if mode == 4:
506 5
        params["foot"] = 1
507 5
    return posturl, params
508
509
510 5
@pem_wrapper
511 5
@try_try_again
512 5
def tcl_download_request(curef, tvver, fwid, salt, vkh, session=None, mode=4, fvver="AAA000", export=False):
513
    """
514
    Check TCL server for download URLs.
515
516
    :param curef: PRD of the phone variant to check.
517
    :type curef: str
518
519
    :param tvver: Target software version.
520
    :type tvver: str
521
522
    :param fwid: Firmware ID for desired download file.
523
    :type fwid: str
524
525
    :param salt: Salt hash.
526
    :type salt: str
527
528
    :param vkh: VDKey-based hash.
529
    :type vkh: str
530
531
    :param session: Requests session object, default is created on the fly.
532
    :type session: requests.Session()
533
534
    :param mode: 4 if downloading autoloaders, 2 if downloading OTA deltas.
535
    :type mode: int
536
537
    :param fvver: Initial software version, must be specific if downloading OTA deltas.
538
    :type fvver: str
539
540
    :param export: Whether to export XML response to file. Default is False.
541
    :type export: bool
542
    """
543 5
    sess = generic_session(session)
544 5
    posturl, params = download_request_prep(curef, tvver, fwid, salt, vkh, mode, fvver)
545 5
    req = sess.post(posturl, data=params)
546 5
    if req.status_code == 200:
547 5
        req.encoding = "utf-8"
548 5
        response = req.text
549 5
        if export:
550 5
            dump_tcl_xml(response)
551
    else:
552 5
        response = None
553 5
    return response
554
555
556 5
def parse_tcl_download_request(body, mode=4):
557
    """
558
    Extract file URL and encrypt slave URL from TCL update server response.
559
560
    :param data: The data to parse.
561
    :type data: str
562
563
    :param mode: 4 if downloading autoloaders, 2 if downloading OTA deltas.
564
    :type mode: int
565
    """
566 5
    root = ElementTree.fromstring(body)
567 5
    slavelist = root.find("SLAVE_LIST").findall("SLAVE")
568 5
    slave = random.choice(slavelist).text
569 5
    dlurl = root.find("FILE_LIST").find("FILE").find("DOWNLOAD_URL").text
570 5
    eslave = root.find("SLAVE_LIST").findall("ENCRYPT_SLAVE")
571 5
    encslave = None if mode == 2 or not eslave else random.choice(eslave).text
572 5
    return "http://{0}{1}".format(slave, dlurl), encslave
573
574
575 5
def encrypt_header_prep(address, encslave):
576
    """
577
    Prepare variables for encrypted header check.
578
579
    :param address: File URL minus host.
580
    :type address: str
581
582
    :param encslave: Server hosting header script.
583
    :type encslave: str
584
    """
585 5
    encs = {b"YWNjb3VudA==" : b"emhlbmdodWEuZ2Fv", b"cGFzc3dvcmQ=": b"cWFydUQ0b2s="}
586 5
    params = {base64.b64decode(key): base64.b64decode(val) for key, val in encs.items()}
587 5
    params[b"address"] = bytes(address, "utf-8")
588 5
    posturl = "http://{0}/encrypt_header.php".format(encslave)
589 5
    return posturl, params
590
591
592 5
@pem_wrapper
593 5
def encrypt_header(address, encslave, session=None):
594
    """
595
    Check encrypted header.
596
597
    :param address: File URL minus host.
598
    :type address: str
599
600
    :param encslave: Server hosting header script.
601
    :type encslave: str
602
603
    :param session: Requests session object, default is created on the fly.
604
    :type session: requests.Session()
605
    """
606 5
    sess = generic_session(session)
607 5
    posturl, params = encrypt_header_prep(address, encslave)
608 5
    req = sess.post(posturl, data=params)
609 5
    if req.status_code == 206:  # partial
610 5
        contentlength = int(req.headers["Content-Length"])
611 5
        sentinel = "HEADER FOUND" if contentlength == 4194320 else "NO HEADER FOUND"
612
    else:
613 5
        sentinel = None
614 5
    return sentinel
615
616
617 5
@pem_wrapper
618
def remote_prd_info():
619
    """
620
    Get list of remote OTA versions.
621
    """
622 5
    dburl = "https://tclota.birth-online.de/json_lastupdates.php"
623 5
    req = requests.get(dburl)
624 5
    reqj = req.json()
625 5
    otadict = {val["curef"]: val["last_ota"] for val in reqj.values() if val["last_ota"] is not None}
626 5
    return otadict
627
628
629 5
def cchecker_get_tags(root):
630
    """
631
    Get country and carrier from XML.
632
633
    :param root: ElementTree we're barking up.
634
    :type root: xml.etree.ElementTree.ElementTree
635
    """
636 5
    for child in root:
637 5
        if child.tag == "country":
638 5
            country = child.get("name")
639 5
        if child.tag == "carrier":
640 5
            carrier = child.get("name")
641 5
    return country, carrier
2 ignored issues
show
introduced by
The variable country does not seem to be defined for all execution paths.
Loading history...
introduced by
The variable carrier does not seem to be defined for all execution paths.
Loading history...
642
643
644 5
@pem_wrapper
645 5
def carrier_checker(mcc, mnc, session=None):
646
    """
647
    Query BlackBerry World to map a MCC and a MNC to a country and carrier.
648
649
    :param mcc: Country code.
650
    :type mcc: int
651
652
    :param mnc: Network code.
653
    :type mnc: int
654
655
    :param session: Requests session object, default is created on the fly.
656
    :type session: requests.Session()
657
    """
658 5
    session = generic_session(session)
659 5
    baseurl = "http://appworld.blackberry.com/ClientAPI/checkcarrier"
660 5
    url = "{2}?homemcc={0}&homemnc={1}&devicevendorid=-1&pin=0".format(mcc, mnc, baseurl)
661 5
    user_agent = {'User-agent': 'AppWorld/5.1.0.60'}
662 5
    req = session.get(url, headers=user_agent)
663 5
    root = ElementTree.fromstring(req.text)
664 5
    country, carrier = cchecker_get_tags(root)
665 5
    return country, carrier
666
667
668 5
def return_npc(mcc, mnc):
669
    """
670
    Format MCC and MNC into a NPC.
671
672
    :param mcc: Country code.
673
    :type mcc: int
674
675
    :param mnc: Network code.
676
    :type mnc: int
677
    """
678 5
    return "{0}{1}30".format(str(mcc).zfill(3), str(mnc).zfill(3))
679
680
681 5
@pem_wrapper
682 5
def carrier_query(npc, device, upgrade=False, blitz=False, forced=None, session=None):
683
    """
684
    Query BlackBerry servers, check which update is out for a carrier.
685
686
    :param npc: MCC + MNC (see `func:return_npc`)
687
    :type npc: int
688
689
    :param device: Hexadecimal hardware ID.
690
    :type device: str
691
692
    :param upgrade: Whether to use upgrade files. False by default.
693
    :type upgrade: bool
694
695
    :param blitz: Whether or not to create a blitz package. False by default.
696
    :type blitz: bool
697
698
    :param forced: Force a software release.
699
    :type forced: str
700
701
    :param session: Requests session object, default is created on the fly.
702
    :type session: requests.Session()
703
    """
704 5
    session = generic_session(session)
705 5
    upg = "upgrade" if upgrade else "repair"
706 5
    forced = "latest" if forced is None else forced
707 5
    url = "https://cs.sl.blackberry.com/cse/updateDetails/2.2/"
708 5
    query = '<?xml version="1.0" encoding="UTF-8"?>'
709 5
    query += '<updateDetailRequest version="2.2.1" authEchoTS="1366644680359">'
710 5
    query += "<clientProperties>"
711 5
    query += "<hardware>"
712 5
    query += "<pin>0x2FFFFFB3</pin><bsn>1128121361</bsn>"
713 5
    query += "<imei>004401139269240</imei>"
714 5
    query += "<id>0x{0}</id>".format(device)
715 5
    query += "</hardware>"
716 5
    query += "<network>"
717 5
    query += "<homeNPC>0x{0}</homeNPC>".format(npc)
718 5
    query += "<iccid>89014104255505565333</iccid>"
719 5
    query += "</network>"
720 5
    query += "<software>"
721 5
    query += "<currentLocale>en_US</currentLocale>"
722 5
    query += "<legalLocale>en_US</legalLocale>"
723 5
    query += "</software>"
724 5
    query += "</clientProperties>"
725 5
    query += "<updateDirectives>"
726 5
    query += '<allowPatching type="REDBEND">true</allowPatching>'
727 5
    query += "<upgradeMode>{0}</upgradeMode>".format(upg)
728 5
    query += "<provideDescriptions>false</provideDescriptions>"
729 5
    query += "<provideFiles>true</provideFiles>"
730 5
    query += "<queryType>NOTIFICATION_CHECK</queryType>"
731 5
    query += "</updateDirectives>"
732 5
    query += "<pollType>manual</pollType>"
733 5
    query += "<resultPackageSetCriteria>"
734 5
    query += '<softwareRelease softwareReleaseVersion="{0}" />'.format(forced)
735 5
    query += "<releaseIndependent>"
736 5
    query += '<packageType operation="include">application</packageType>'
737 5
    query += "</releaseIndependent>"
738 5
    query += "</resultPackageSetCriteria>"
739 5
    query += "</updateDetailRequest>"
740 5
    header = {"Content-Type": "text/xml;charset=UTF-8"}
741 5
    req = session.post(url, headers=header, data=query)
742 5
    return parse_carrier_xml(req.text, blitz)
743
744
745 5
def carrier_swver_get(root):
746
    """
747
    Get software release from carrier XML.
748
749
    :param root: ElementTree we're barking up.
750
    :type root: xml.etree.ElementTree.ElementTree
751
    """
752 5
    for child in root.iter("softwareReleaseMetadata"):
753 5
        swver = child.get("softwareReleaseVersion")
754 5
    return swver
1 ignored issue
show
introduced by
The variable swver does not seem to be defined in case the for loop on line 752 is not entered. Are you sure this can never be the case?
Loading history...
755
756
757 5
def carrier_child_fileappend(child, files, baseurl, blitz=False):
758
    """
759
    Append bar file links to a list from a child element.
760
761
    :param child: Child element in use.
762
    :type child: xml.etree.ElementTree.Element
763
764
    :param files: Filelist.
765
    :type files: list(str)
766
767
    :param baseurl: Base URL, URL minus the filename.
768
    :type baseurl: str
769
770
    :param blitz: Whether or not to create a blitz package. False by default.
771
    :type blitz: bool
772
    """
773 5
    if not blitz:
774 5
        files.append(baseurl + child.get("path"))
775
    else:
776 5
        if child.get("type") not in ["system:radio", "system:desktop", "system:os"]:
777 5
            files.append(baseurl + child.get("path"))
778 5
    return files
779
780
781 5
def carrier_child_finder(root, files, baseurl, blitz=False):
782
    """
783
    Extract filenames, radio and OS from child elements.
784
785
    :param root: ElementTree we're barking up.
786
    :type root: xml.etree.ElementTree.ElementTree
787
788
    :param files: Filelist.
789
    :type files: list(str)
790
791
    :param baseurl: Base URL, URL minus the filename.
792
    :type baseurl: str
793
794
    :param blitz: Whether or not to create a blitz package. False by default.
795
    :type blitz: bool
796
    """
797 5
    osver = radver = ""
798 5
    for child in root.iter("package"):
799 5
        files = carrier_child_fileappend(child, files, baseurl, blitz)
800 5
        if child.get("type") == "system:radio":
801 5
            radver = child.get("version")
802 5
        elif child.get("type") == "system:desktop":
803 5
            osver = child.get("version")
804 5
        elif child.get("type") == "system:os":
805 5
            osver = child.get("version")
806 5
    return osver, radver, files
807
808
809 5
def parse_carrier_xml(data, blitz=False):
810
    """
811
    Parse the response to a carrier update request and return the juicy bits.
812
813
    :param data: The data to parse.
814
    :type data: xml
815
816
    :param blitz: Whether or not to create a blitz package. False by default.
817
    :type blitz: bool
818
    """
819 5
    root = ElementTree.fromstring(data)
820 5
    sw_exists = root.find('./data/content/softwareReleaseMetadata')
821 5
    swver = "N/A" if sw_exists is None else ""
822 5
    if sw_exists is not None:
823 5
        swver = carrier_swver_get(root)
824 5
    files = []
825 5
    package_exists = root.find('./data/content/fileSets/fileSet')
826 5
    osver = radver = ""
827 5
    if package_exists is not None:
828 5
        baseurl = "{0}/".format(package_exists.get("url"))
829 5
        osver, radver, files = carrier_child_finder(root, files, baseurl, blitz)
830 5
    return(swver, osver, radver, files)
831
832
833 5
@pem_wrapper
834 5
def sr_lookup(osver, server, session=None):
835
    """
836
    Software release lookup, with choice of server.
837
    :data:`bbarchivist.bbconstants.SERVERLIST` for server list.
838
839
    :param osver: OS version to lookup, 10.x.y.zzzz.
840
    :type osver: str
841
842
    :param server: Server to use.
843
    :type server: str
844
845
    :param session: Requests session object, default is created on the fly.
846
    :type session: requests.Session()
847
    """
848 5
    query = '<?xml version="1.0" encoding="UTF-8"?>'
849 5
    query += '<srVersionLookupRequest version="2.0.0"'
850 5
    query += ' authEchoTS="1366644680359">'
851 5
    query += '<clientProperties><hardware>'
852 5
    query += '<pin>0x2FFFFFB3</pin><bsn>1140011878</bsn>'
853 5
    query += '<imei>004402242176786</imei><id>0x8D00240A</id>'
854 5
    query += '<isBootROMSecure>true</isBootROMSecure>'
855 5
    query += '</hardware>'
856 5
    query += '<network>'
857 5
    query += '<vendorId>0x0</vendorId><homeNPC>0x60</homeNPC>'
858 5
    query += '<currentNPC>0x60</currentNPC><ecid>0x1</ecid>'
859 5
    query += '</network>'
860 5
    query += '<software><currentLocale>en_US</currentLocale>'
861 5
    query += '<legalLocale>en_US</legalLocale>'
862 5
    query += '<osVersion>{0}</osVersion>'.format(osver)
863 5
    query += '<omadmEnabled>false</omadmEnabled>'
864 5
    query += '</software></clientProperties>'
865 5
    query += '</srVersionLookupRequest>'
866 5
    reqtext = sr_lookup_poster(query, server, session)
867 5
    packtext = sr_lookup_xmlparser(reqtext)
868 5
    return packtext
869
870
871 5
def sr_lookup_poster(query, server, session=None):
872
    """
873
    Post the XML payload for a software release lookup.
874
875
    :param query: XML payload.
876
    :type query: str
877
878
    :param server: Server to use.
879
    :type server: str
880
881
    :param session: Requests session object, default is created on the fly.
882
    :type session: requests.Session()
883
    """
884 5
    session = generic_session(session)
885 5
    header = {"Content-Type": "text/xml;charset=UTF-8"}
886 5
    try:
887 5
        req = session.post(server, headers=header, data=query, timeout=1)
888 5
    except (requests.exceptions.Timeout, requests.exceptions.ConnectionError):
889 5
        reqtext = "SR not in system"
890
    else:
891 5
        reqtext = req.text
892 5
    return reqtext
893
894
895 5
def sr_lookup_xmlparser(reqtext):
896
    """
897
    Take the text of a software lookup request response and parse it as XML.
898
899
    :param reqtext: Response text, hopefully XML formatted.
900
    :type reqtext: str
901
    """
902 5
    try:
903 5
        root = ElementTree.fromstring(reqtext)
904 5
    except ElementTree.ParseError:
905 5
        packtext = "SR not in system"
906
    else:
907 5
        packtext = sr_lookup_extractor(root)
908 5
    return packtext
909
910
911 5
def sr_lookup_extractor(root):
912
    """
913
    Take an ElementTree and extract a software release from it.
914
915
    :param root: ElementTree we're barking up.
916
    :type root: xml.etree.ElementTree.ElementTree
917
    """
918 5
    reg = re.compile(r"(\d{1,4}\.)(\d{1,4}\.)(\d{1,4}\.)(\d{1,4})")
919 5
    packages = root.findall('./data/content/')
920 5
    for package in packages:
921 5
        if package.text is not None:
922 5
            match = reg.match(package.text)
923 5
            packtext = package.text if match else "SR not in system"
924 5
            return packtext
925
926
927 5
def sr_lookup_bootstrap(osv, session=None, no2=False):
928
    """
929
    Run lookups for each server for given OS.
930
931
    :param osv: OS to check.
932
    :type osv: str
933
934
    :param session: Requests session object, default is created on the fly.
935
    :type session: requests.Session()
936
937
    :param no2: Whether to skip Alpha2/Beta2 servers. Default is false.
938
    :type no2: bool
939
    """
940 5
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as xec:
941 5
        try:
942 5
            results = {
943
                "p": None,
944
                "a1": None,
945
                "a2": None,
946
                "b1": None,
947
                "b2": None
948
            }
949 5
            if no2:
950 5
                del results["a2"]
951 5
                del results["b2"]
952 5
            for key in results:
953 5
                results[key] = xec.submit(sr_lookup, osv, SERVERS[key], session).result()
954 5
            return results
955 5
        except KeyboardInterrupt:
956 5
            xec.shutdown(wait=False)
957
958
959 5
@pem_wrapper
960 5
def available_bundle_lookup(mcc, mnc, device, session=None):
961
    """
962
    Check which software releases were ever released for a carrier.
963
964
    :param mcc: Country code.
965
    :type mcc: int
966
967
    :param mnc: Network code.
968
    :type mnc: int
969
970
    :param device: Hexadecimal hardware ID.
971
    :type device: str
972
973
    :param session: Requests session object, default is created on the fly.
974
    :type session: requests.Session()
975
    """
976 5
    session = generic_session(session)
977 5
    server = "https://cs.sl.blackberry.com/cse/availableBundles/1.0.0/"
978 5
    npc = return_npc(mcc, mnc)
979 5
    query = '<?xml version="1.0" encoding="UTF-8"?>'
980 5
    query += '<availableBundlesRequest version="1.0.0" '
981 5
    query += 'authEchoTS="1366644680359">'
982 5
    query += '<deviceId><pin>0x2FFFFFB3</pin></deviceId>'
983 5
    query += '<clientProperties><hardware><id>0x{0}</id>'.format(device)
984 5
    query += '<isBootROMSecure>true</isBootROMSecure></hardware>'
985 5
    query += '<network><vendorId>0x0</vendorId><homeNPC>0x{0}</homeNPC>'.format(npc)
986 5
    query += '<currentNPC>0x{0}</currentNPC></network><software>'.format(npc)
987 5
    query += '<currentLocale>en_US</currentLocale>'
988 5
    query += '<legalLocale>en_US</legalLocale>'
989 5
    query += '<osVersion>10.0.0.0</osVersion>'
990 5
    query += '<radioVersion>10.0.0.0</radioVersion></software>'
991 5
    query += '</clientProperties><updateDirectives><bundleVersionFilter>'
992 5
    query += '</bundleVersionFilter></updateDirectives>'
993 5
    query += '</availableBundlesRequest>'
994 5
    header = {"Content-Type": "text/xml;charset=UTF-8"}
995 5
    req = session.post(server, headers=header, data=query)
996 5
    root = ElementTree.fromstring(req.text)
997 5
    package = root.find('./data/content')
998 5
    bundlelist = [child.attrib["version"] for child in package]
999 5
    return bundlelist
1000
1001
1002 5
@pem_wrapper
1003 5
def ptcrb_scraper(ptcrbid, session=None):
1004
    """
1005
    Get the PTCRB results for a given device.
1006
1007
    :param ptcrbid: Numerical ID from PTCRB (end of URL).
1008
    :type ptcrbid: str
1009
1010
    :param session: Requests session object, default is created on the fly.
1011
    :type session: requests.Session()
1012
    """
1013 5
    baseurl = "https://www.ptcrb.com/certified-devices/device-details/?model={0}".format(ptcrbid)
1014 5
    sess = generic_session(session)
1015 5
    useragent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.79 Safari/537.36"
1016 5
    sess.headers.update({"User-agent": useragent})
1017 5
    soup = generic_soup_parser(baseurl, sess)
1018 5
    certtable = soup.find_all("table")[1]
1019 5
    tds = certtable.find_all("td")[1::2]  # every other
1020 5
    prelimlist = [tdx.text for tdx in tds]
1021 5
    cleanlist = [ptcrb_item_cleaner(item.strip()) for item in prelimlist]
1022 5
    return cleanlist
1023
1024
1025 5
def space_pad(instring, minlength):
1026
    """
1027
    Pad a string with spaces until it's the minimum length.
1028
1029
    :param instring: String to pad.
1030
    :type instring: str
1031
1032
    :param minlength: Pad while len(instring) < minlength.
1033
    :type minlength: int
1034
    """
1035 5
    while len(instring) < minlength:
1036 5
        instring += " "
1037 5
    return instring
1038
1039
1040 5
def ptcrb_cleaner_multios(item):
1041
    """
1042
    Discard multiple entries for "OS".
1043
1044
    :param item: The item to clean.
1045
    :type item: str
1046
    """
1047 5
    if item.count("OS") > 1:
1048 5
        templist = item.split("OS")
1049 5
        templist[0] = "OS"
1050 5
        item = "".join([templist[0], templist[1]])
1051 5
    return item
1052
1053
1054 5
def ptcrb_cleaner_spaces(item):
1055
    """
1056
    Pad item with spaces to the right length.
1057
1058
    :param item: The item to clean.
1059
    :type item: str
1060
    """
1061 5
    spaclist = item.split(" ")
1062 5
    if len(spaclist) > 1:
1063 5
        spaclist[1] = space_pad(spaclist[1], 11)
1064 5
    if len(spaclist) > 3:
1065 5
        spaclist[3] = space_pad(spaclist[3], 11)
1066 5
    item = " ".join(spaclist)
1067 5
    return item
1068
1069
1070 5
def ptcrb_item_cleaner(item):
1071
    """
1072
    Cleanup poorly formatted PTCRB entries written by an intern.
1073
1074
    :param item: The item to clean.
1075
    :type item: str
1076
    """
1077 5
    item = item.replace("<td>", "")
1078 5
    item = item.replace("</td>", "")
1079 5
    item = item.replace("\n", "")
1080 5
    item = item.replace("SW: OS", "OS")
1081 5
    item = item.replace("Software Version: OS", "OS")
1082 5
    item = item.replace(" (SR", ", SR")
1083 5
    item = re.sub(r"\s?\((.*)$", "", item)
1084 5
    item = re.sub(r"\sSV.*$", "", item)
1085 5
    item = item.replace(")", "")
1086 5
    item = item.replace(". ", ".")
1087 5
    item = item.replace(";", "")
1088 5
    item = item.replace("version", "Version")
1089 5
    item = item.replace("Verison", "Version")
1090 5
    item = ptcrb_cleaner_multios(item)
1091 5
    item = item.replace("SR10", "SR 10")
1092 5
    item = item.replace("SR", "SW Release")
1093 5
    item = item.replace(" Version:", ":")
1094 5
    item = item.replace("Version ", " ")
1095 5
    item = item.replace(":1", ": 1")
1096 5
    item = item.replace(", ", " ")
1097 5
    item = item.replace(",", " ")
1098 5
    item = item.replace("Software", "SW")
1099 5
    item = item.replace("  ", " ")
1100 5
    item = item.replace("OS ", "OS: ")
1101 5
    item = item.replace("Radio ", "Radio: ")
1102 5
    item = item.replace("Release ", "Release: ")
1103 5
    item = ptcrb_cleaner_spaces(item)
1104 5
    item = item.strip()
1105 5
    item = item.replace("\r", "")
1106 5
    if item.startswith("10"):
1107 5
        item = "OS: {0}".format(item)
1108 5
    item = item.replace(":    ", ": ")
1109 5
    item = item.replace(":   ", ": ")
1110 5
    return item
1111
1112
1113 5
@pem_wrapper
1114 5
def kernel_scraper(utils=False, session=None):
1115
    """
1116
    Scrape BlackBerry's GitHub kernel repo for available branches.
1117
1118
    :param utils: Check android-utils repo instead of android-linux-kernel. Default is False.
1119
    :type utils: bool
1120
1121
    :param session: Requests session object, default is created on the fly.
1122
    :type session: requests.Session()
1123
    """
1124 5
    repo = "android-utils" if utils else "android-linux-kernel"
1125 5
    kernlist = []
1126 5
    sess = generic_session(session)
1127 5
    for page in range(1, 10):
1128 5
        url = "https://github.com/blackberry/{0}/branches/all?page={1}".format(repo, page)
1129 5
        soup = generic_soup_parser(url, sess)
1130 5
        if soup.find("div", {"class": "no-results-message"}):
1131 5
            break
1132
        else:
1133 5
            text = soup.get_text()
1134 5
            kernlist.extend(re.findall(r"msm[0-9]{4}\/[A-Z0-9]{6}", text, re.IGNORECASE))
1135 5
    return kernlist
1136
1137
1138 5
def root_generator(folder, build, variant="common"):
1139
    """
1140
    Generate roots for the SHAxxx hash lookup URLs.
1141
1142
    :param folder: Dictionary of variant: loader name pairs.
1143
    :type folder: dict(str: str)
1144
1145
    :param build: Build to check, 3 letters + 3 numbers.
1146
    :type build: str
1147
1148
    :param variant: Autoloader variant. Default is "common".
1149
    :type variant: str
1150
    """
1151
    #Priv specific
1152 5
    privx = "bbfoundation/hashfiles_priv/{0}".format(folder[variant])
1153
    #DTEK50 specific
1154 5
    dtek50x = "bbSupport/DTEK50" if build[:3] == "AAF" else "bbfoundation/hashfiles_priv/dtek50"
1155
    #DTEK60 specific
1156 5
    dtek60x = dtek50x  # still uses dtek50 folder, for some reason
1157
    #Pack it up
1158 5
    roots = {"Priv": privx, "DTEK50": dtek50x, "DTEK60": dtek60x}
1159 5
    return roots
1160
1161
1162 5
def make_droid_skeleton_bbm(method, build, device, variant="common"):
1163
    """
1164
    Make an Android autoloader/hash URL, on the BB Mobile site.
1165
1166
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
1167
    :type method: str
1168
1169
    :param build: Build to check, 3 letters + 3 numbers.
1170
    :type build: str
1171
1172
    :param device: Device to check.
1173
    :type device: str
1174
1175
    :param variant: Autoloader variant. Default is "common".
1176
    :type variant: str
1177
    """
1178 5
    devices = {"KEYone": "qc8953", "Motion": "qc8953"}
1179 5
    base = "bbry_{2}_autoloader_user-{0}-{1}".format(variant, build.upper(), devices[device])
1180 5
    if method is None:
1181 5
        skel = "http://54.247.87.13/softwareupgrade/BBM/{0}.zip".format(base)
1182
    else:
1183 5
        skel = "http://54.247.87.13/softwareupgrade/BBM/{0}.{1}sum".format(base, method.lower())
1184 5
    return skel
1185
1186
1187 5
def make_droid_skeleton_og(method, build, device, variant="common"):
1188
    """
1189
    Make an Android autoloader/hash URL, on the original site.
1190
1191
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
1192
    :type method: str
1193
1194
    :param build: Build to check, 3 letters + 3 numbers.
1195
    :type build: str
1196
1197
    :param device: Device to check.
1198
    :type device: str
1199
1200
    :param variant: Autoloader variant. Default is "common".
1201
    :type variant: str
1202
    """
1203 5
    folder = {"vzw-vzw": "verizon", "na-att": "att", "na-tmo": "tmo", "common": "default"}
1204 5
    devices = {"Priv": "qc8992", "DTEK50": "qc8952_64_sfi", "DTEK60": "qc8996"}
1205 5
    roots = root_generator(folder, build, variant)
1206 5
    base = "bbry_{2}_autoloader_user-{0}-{1}".format(variant, build.upper(), devices[device])
1207 5
    if method is None:
1208 5
        baseurl = "https://bbapps.download.blackberry.com/Priv"
1209 5
        skel = "{1}/{0}.zip".format(base, baseurl)
1210
    else:
1211 5
        baseurl = "https://ca.blackberry.com/content/dam"
1212 5
        skel = "{3}/{1}/{0}.{2}sum".format(base, roots[device], method.lower(), baseurl)
1213 5
    return skel
1214
1215
1216 5
def make_droid_skeleton(method, build, device, variant="common"):
1217
    """
1218
    Make an Android autoloader/hash URL.
1219
1220
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
1221
    :type method: str
1222
1223
    :param build: Build to check, 3 letters + 3 numbers.
1224
    :type build: str
1225
1226
    :param device: Device to check.
1227
    :type device: str
1228
1229
    :param variant: Autoloader variant. Default is "common".
1230
    :type variant: str
1231
    """
1232
    # No Aurora
1233 5
    oglist = ("Priv", "DTEK50", "DTEK60")  # BlackBerry
1234 5
    bbmlist = ("KEYone", "Motion")   # BB Mobile
1235 5
    if device in oglist:
1236 5
        skel = make_droid_skeleton_og(method, build, device, variant)
1237 5
    elif device in bbmlist:
1238 5
        skel = make_droid_skeleton_bbm(method, build, device, variant)
1239 5
    return skel
1 ignored issue
show
introduced by
The variable skel does not seem to be defined for all execution paths.
Loading history...
1240
1241
1242 5
def bulk_droid_skeletons(devs, build, method=None):
1243
    """
1244
    Prepare list of Android autoloader/hash URLs.
1245
1246
    :param devs: List of devices.
1247
    :type devs: list(str)
1248
1249
    :param build: Build to check, 3 letters + 3 numbers.
1250
    :type build: str
1251
1252
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
1253
    :type method: str
1254
    """
1255 5
    carrier_variants = {
1256
        "Priv": ("common", "vzw-vzw", "na-tmo", "na-att"),
1257
        "KEYone": ("common", "usa-sprint", "global-att", "china-china")
1258
    }
1259 5
    common_variants = ("common", )  # for single-variant devices
1260 5
    carrier_devices = ("Priv", )  # add KEYone when verified
1261 5
    skels = []
1262 5
    for dev in devs:
1263 5
        varlist = carrier_variants[dev] if dev in carrier_devices else common_variants
1264 5
        for var in varlist:
1265 5
            skel = make_droid_skeleton(method, build, dev, var)
1266 5
            skels.append(skel)
1267 5
    return skels
1268
1269
1270 5
def prepare_droid_list(device):
1271
    """
1272
    Convert single devices to a list, if necessary.
1273
1274
    :param device: Device to check.
1275
    :type device: str
1276
    """
1277 5
    if isinstance(device, list):
1278 5
        devs = device
1279
    else:
1280 5
        devs = [device]
1281 5
    return devs
1282
1283
1284 5
def droid_scanner(build, device, method=None, session=None):
1285
    """
1286
    Check for Android autoloaders on BlackBerry's site.
1287
1288
    :param build: Build to check, 3 letters + 3 numbers.
1289
    :type build: str
1290
1291
    :param device: Device to check.
1292
    :type device: str
1293
1294
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
1295
    :type method: str
1296
1297
    :param session: Requests session object, default is created on the fly.
1298
    :type session: requests.Session()
1299
    """
1300 5
    devs = prepare_droid_list(device)
1301 5
    skels = bulk_droid_skeletons(devs, build, method)
1302 5
    with concurrent.futures.ThreadPoolExecutor(max_workers=len(skels)) as xec:
1303 5
        results = droid_scanner_worker(xec, skels, session)
1304 5
    return results if results else None
1305
1306
1307 5
def droid_scanner_worker(xec, skels, session=None):
1308
    """
1309
    Worker to check for Android autoloaders.
1310
1311
    :param xec: ThreadPoolExecutor instance.
1312
    :type xec: concurrent.futures.ThreadPoolExecutor
1313
1314
    :param skels: List of skeleton formats.
1315
    :type skels: list(str)
1316
1317
    :param session: Requests session object, default is created on the fly.
1318
    :type session: requests.Session()
1319
    """
1320 5
    results = []
1321 5
    for skel in skels:
1322 5
        avail = xec.submit(availability, skel, session)
1323 5
        if avail.result():
1324 5
            results.append(skel)
1325 5
    return results
1326
1327
1328 5
def chunker(iterable, inc):
1329
    """
1330
    Convert an iterable into a list of inc sized lists.
1331
1332
    :param iterable: Iterable to chunk.
1333
    :type iterable: list/tuple/string
1334
1335
    :param inc: Increment; how big each chunk is.
1336
    :type inc: int
1337
    """
1338 5
    chunks = [iterable[x:x+inc] for x in range(0, len(iterable), inc)]
1339 5
    return chunks
1340
1341
1342 5
def unicode_filter(intext):
1343
    """
1344
    Remove Unicode crap.
1345
1346
    :param intext: Text to filter.
1347
    :type intext: str
1348
    """
1349 5
    return intext.replace("\u2013", "").strip()
1350
1351
1352 5
def table_header_filter(ptag):
1353
    """
1354
    Validate p tag, to see if it's relevant.
1355
1356
    :param ptag: P tag.
1357
    :type ptag: bs4.element.Tag
1358
    """
1359 5
    valid = ptag.find("b") and "BlackBerry" in ptag.text and not "experts" in ptag.text
1360 5
    return valid
1361
1362
1363 5
def table_headers(pees):
1364
    """
1365
    Generate table headers from list of p tags.
1366
1367
    :param pees: List of p tags.
1368
    :type pees: list(bs4.element.Tag)
1369
    """
1370 5
    bolds = [x.text for x in pees if table_header_filter(x)]
1371 5
    return bolds
1372
1373
1374 5
@pem_wrapper
1375 5
def loader_page_scraper(session=None):
1376
    """
1377
    Return scraped autoloader pages.
1378
1379
    :param session: Requests session object, default is created on the fly.
1380
    :type session: requests.Session()
1381
    """
1382 5
    session = generic_session(session)
1383 5
    loader_page_scraper_og(session)
1384 5
    loader_page_scraper_bbm(session)
1385
1386
1387 5
def loader_page_scraper_og(session=None):
1388
    """
1389
    Return scraped autoloader page, original site.
1390
1391
    :param session: Requests session object, default is created on the fly.
1392
    :type session: requests.Session()
1393
    """
1394 5
    url = "https://ca.blackberry.com/support/smartphones/Android-OS-Reload.html"
1395 5
    soup = generic_soup_parser(url, session)
1396 5
    tables = soup.find_all("table")
1397 5
    headers = table_headers(soup.find_all("p"))
1398 5
    for idx, table in enumerate(tables):
1399 5
        loader_page_chunker_og(idx, table, headers)
1400
1401
1402 5
def loader_page_scraper_bbm(session=None):
1403
    """
1404
    Return scraped autoloader page, new site.
1405
1406
    :param session: Requests session object, default is created on the fly.
1407
    :type session: requests.Session()
1408
    """
1409 5
    url = "https://www.blackberrymobile.com/support/reload-software/"
1410 5
    soup = generic_soup_parser(url, session)
1411 5
    ulls = soup.find_all("ul", {"class": re.compile("list-two special-.")})[1:]
1412 5
    print("~~~BlackBerry KEYone~~~")
1413 5
    for ull in ulls:
1414 5
        loader_page_chunker_bbm(ull)
1415
1416
1417 5
def loader_page_chunker_og(idx, table, headers):
1418
    """
1419
    Given a loader page table, chunk it into lists of table cells.
1420
1421
    :param idx: Index of enumerating tables.
1422
    :type idx: int
1423
1424
    :param table: HTML table tag.
1425
    :type table: bs4.element.Tag
1426
1427
    :param headers: List of table headers.
1428
    :type headers: list(str)
1429
    """
1430 5
    print("~~~{0}~~~".format(headers[idx]))
1431 5
    chunks = chunker(table.find_all("td"), 4)
1432 5
    for chunk in chunks:
1433 5
        loader_page_printer(chunk)
1434 5
    print(" ")
1435
1436
1437 5
def loader_page_chunker_bbm(ull):
1438
    """
1439
    Given a loader page list, chunk it into lists of list items.
1440
1441
    :param ull: HTML unordered list tag.
1442
    :type ull: bs4.element.Tag
1443
    """
1444 5
    chunks = chunker(ull.find_all("li"), 3)
1445 5
    for chunk in chunks:
1446 5
        loader_page_printer(chunk)
1447
1448
1449 5
def loader_page_printer(chunk):
1450
    """
1451
    Print individual cell texts given a list of table cells.
1452
1453
    :param chunk: List of td tags.
1454
    :type chunk: list(bs4.element.Tag)
1455
    """
1456 5
    key = unicode_filter(chunk[0].text)
1457 5
    ver = unicode_filter(chunk[1].text)
1458 5
    link = unicode_filter(chunk[2].find("a")["href"])
1459 5
    print("{0}\n    {1}: {2}".format(key, ver, link))
1460
1461
1462 5
@pem_wrapper
1463 5
def base_metadata(url, session=None):
1464
    """
1465
    Get BBNDK metadata, base function.
1466
1467
    :param url: URL to check.
1468
    :type url: str
1469
1470
    :param session: Requests session object, default is created on the fly.
1471
    :type session: requests.Session()
1472
    """
1473 5
    session = generic_session(session)
1474 5
    req = session.get(url)
1475 5
    data = req.content
1476 5
    entries = data.split(b"\n")
1477 5
    metadata = [entry.split(b",")[1].decode("utf-8") for entry in entries if entry]
1478 5
    return metadata
1479
1480
1481 5
def base_metadata_url(alternate=None):
1482
    """
1483
    Return metadata URL.
1484
1485
    :param alternate: If the URL is for the simulator metadata. Default is False.
1486
    :type alternate: str
1487
    """
1488 5
    baseurl = "http://downloads.blackberry.com/upr/developers/update/bbndk"
1489 5
    tail = "{0}/{0}_metadata".format(alternate) if alternate is not None else "metadata"
1490 5
    return "{0}/{1}".format(baseurl, tail)
1491
1492
1493 5
def ndk_metadata(session=None):
1494
    """
1495
    Get BBNDK target metadata.
1496
1497
    :param session: Requests session object, default is created on the fly.
1498
    :type session: requests.Session()
1499
    """
1500 5
    ndkurl = base_metadata_url()
1501 5
    data = base_metadata(ndkurl, session)
1502 5
    metadata = [entry for entry in data if entry.startswith(("10.0", "10.1", "10.2"))]
1503 5
    return metadata
1504
1505
1506 5
def sim_metadata(session=None):
1507
    """
1508
    Get BBNDK simulator metadata.
1509
1510
    :param session: Requests session object, default is created on the fly.
1511
    :type session: requests.Session()
1512
    """
1513 5
    simurl = base_metadata_url("simulator")
1514 5
    metadata = base_metadata(simurl, session)
1515 5
    return metadata
1516
1517
1518 5
def runtime_metadata(session=None):
1519
    """
1520
    Get BBNDK runtime metadata.
1521
1522
    :param session: Requests session object, default is created on the fly.
1523
    :type session: requests.Session()
1524
    """
1525 5
    rturl = base_metadata_url("runtime")
1526 5
    metadata = base_metadata(rturl, session)
1527 5
    return metadata
1528
1529
1530 5
def series_generator(osversion):
1531
    """
1532
    Generate series/branch name from OS version.
1533
1534
    :param osversion: OS version.
1535
    :type osversion: str
1536
    """
1537 5
    splits = osversion.split(".")
1538 5
    return "BB{0}_{1}_{2}".format(*splits[0:3])
1539
1540
1541 5
@pem_wrapper
1542 5
def devalpha_urls(osversion, skel, session=None):
1543
    """
1544
    Check individual Dev Alpha autoloader URLs.
1545
1546
    :param osversion: OS version.
1547
    :type osversion: str
1548
1549
    :param skel: Individual skeleton format to try.
1550
    :type skel: str
1551
1552
    :param session: Requests session object, default is created on the fly.
1553
    :type session: requests.Session()
1554
    """
1555 5
    session = generic_session(session)
1556 5
    baseurl = "http://downloads.blackberry.com/upr/developers/downloads"
1557 5
    url = "{2}/{0}{1}.exe".format(skel, osversion, baseurl)
1558 5
    req = session.head(url)
1559 5
    if req.status_code == 200:
1560 5
        finals = (url, req.headers["content-length"])
1561
    else:
1562 5
        finals = ()
1563 5
    return finals
1564
1565
1566 5
def devalpha_urls_serieshandler(osversion, skeletons):
1567
    """
1568
    Process list of candidate Dev Alpha autoloader URLs.
1569
1570
    :param osversion: OS version.
1571
    :type osversion: str
1572
1573
    :param skeletons: List of skeleton formats to try.
1574
    :type skeletons: list
1575
    """
1576 5
    skels = skeletons
1577 5
    for idx, skel in enumerate(skeletons):
1578 5
        if "<SERIES>" in skel:
1579 5
            skels[idx] = skel.replace("<SERIES>", series_generator(osversion))
1580 5
    return skels
1581
1582
1583 5
def devalpha_urls_bulk(osversion, skeletons, xec, session=None):
1584
    """
1585
    Construct list of valid Dev Alpha autoloader URLs.
1586
1587
    :param osversion: OS version.
1588
    :type osversion: str
1589
1590
    :param skeletons: List of skeleton formats to try.
1591
    :type skeletons: list
1592
1593
    :param xec: ThreadPoolExecutor instance.
1594
    :type xec: concurrent.futures.ThreadPoolExecutor
1595
1596
    :param session: Requests session object, default is created on the fly.
1597
    :type session: requests.Session()
1598
    """
1599 5
    finals = {}
1600 5
    skels = devalpha_urls_serieshandler(osversion, skeletons)
1601 5
    for skel in skels:
1602 5
        final = xec.submit(devalpha_urls, osversion, skel, session).result()
1603 5
        if final:
1604 5
            finals[final[0]] = final[1]
1605 5
    return finals
1606
1607
1608 5
def devalpha_urls_bootstrap(osversion, skeletons, session=None):
1609
    """
1610
    Get list of valid Dev Alpha autoloader URLs.
1611
1612
    :param osversion: OS version.
1613
    :type osversion: str
1614
1615
    :param skeletons: List of skeleton formats to try.
1616
    :type skeletons: list
1617
1618
    :param session: Requests session object, default is created on the fly.
1619
    :type session: requests.Session()
1620
    """
1621 5
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as xec:
1622 5
        try:
1623 5
            return devalpha_urls_bulk(osversion, skeletons, xec, session)
1624 5
        except KeyboardInterrupt:
1625 5
            xec.shutdown(wait=False)
1626
1627
1628 5
def dev_dupe_dicter(finals):
1629
    """
1630
    Prepare dictionary to clean duplicate autoloaders.
1631
1632
    :param finals: Dict of URL:content-length pairs.
1633
    :type finals: dict(str: str)
1634
    """
1635 5
    revo = {}
1636 5
    for key, val in finals.items():
1637 5
        revo.setdefault(val, set()).add(key)
1638 5
    return revo
1639
1640
1641 5
def dev_dupe_remover(finals, dupelist):
1642
    """
1643
    Filter dictionary of autoloader entries.
1644
1645
    :param finals: Dict of URL:content-length pairs.
1646
    :type finals: dict(str: str)
1647
1648
    :param dupelist: List of duplicate URLs.
1649
    :type duplist: list(str)
1650
    """
1651 5
    for dupe in dupelist:
1652 5
        for entry in dupe:
1653 5
            if "DevAlpha" in entry:
1654 5
                del finals[entry]
1655 5
    return finals
1656
1657
1658 5
def dev_dupe_cleaner(finals):
1659
    """
1660
    Clean duplicate autoloader entries.
1661
1662
    :param finals: Dict of URL:content-length pairs.
1663
    :type finals: dict(str: str)
1664
    """
1665 5
    revo = dev_dupe_dicter(finals)
1666 5
    dupelist = [val for key, val in revo.items() if len(val) > 1]
1667 5
    finals = dev_dupe_remover(finals, dupelist)
1668
    return finals
1669