Test Failed
Branch master (c56792)
by John
02:11
created

bbarchivist.networkutils   F

Complexity

Total Complexity 192

Size/Duplication

Total Lines 1669
Duplicated Lines 0 %

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
eloc 679
dl 0
loc 1669
rs 0.6314
c 0
b 0
f 0
ccs 657
cts 657
cp 1
wmc 192

80 Functions

Rating   Name   Duplication   Size   Complexity  
A tcl_master() 0 5 1
B download_bootstrap() 0 29 5
A table_headers() 0 9 3
A dump_tcl_xml() 0 12 3
A grab_pem() 0 10 3
A encrypt_header() 0 23 3
B ptcrb_item_cleaner() 0 41 2
B kernel_scraper() 0 23 4
A loader_page_scraper_bbm() 0 13 2
A loader_page_printer() 0 11 1
A encrypt_header_prep() 0 15 2
B vkhash() 0 34 1
A remove_empty_download() 0 9 2
B bulk_droid_skeletons() 0 26 4
A series_generator() 0 9 1
A getcode() 0 18 2
A ptcrb_scraper() 0 21 3
A table_header_filter() 0 9 1
B download_writer() 0 29 4
B download_request_prep() 0 37 2
B tcl_check() 0 32 3
A droid_scanner() 0 21 3
B make_droid_skeleton() 0 24 3
A droid_scanner_worker() 0 19 3
A dev_dupe_remover() 0 15 4
A get_length() 0 20 3
A availability() 0 14 1
A unpack_vdkey() 0 7 1
B parse_carrier_xml() 0 22 4
A parse_tcl_download_request() 0 17 2
A pem_wrapper() 0 14 1
A base_metadata_url() 0 10 2
A make_droid_skeleton_bbm() 0 23 2
A prepare_droid_list() 0 12 2
A remote_prd_info() 0 10 3
A chunker() 0 12 2
A download() 0 21 1
A devalpha_urls() 0 23 2
B tcl_download_request() 0 44 3
A space_pad() 0 13 2
A parse_tcl_check() 0 15 1
A carrier_checker() 0 22 1
B make_droid_skeleton_og() 0 27 2
A ptcrb_cleaner_spaces() 0 14 3
A cchecker_get_tags() 0 13 4
A tcl_default_id() 0 10 2
A root_generator() 0 22 2
B carrier_child_finder() 0 26 5
A loader_page_chunker_og() 0 18 2
A loader_page_scraper() 0 11 1
B try_try_again() 0 23 5
A sim_metadata() 0 10 1
A dev_dupe_cleaner() 0 11 3
A generic_session() 0 9 2
A download_android_tools() 0 17 4
A return_npc() 0 11 1
A devalpha_urls_bootstrap() 0 18 3
A carrier_query() 0 62 3
B check_prep() 0 32 1
A sr_lookup_extractor() 0 14 4
A generic_soup_parser() 0 14 1
B sr_lookup() 0 36 1
A base_metadata() 0 17 3
A loader_page_chunker_bbm() 0 10 2
A sr_lookup_xmlparser() 0 14 3
A ptcrb_cleaner_multios() 0 12 2
A sr_lookup_poster() 0 22 3
A tcl_salt() 0 7 1
B available_bundle_lookup() 0 41 2
A devalpha_urls_serieshandler() 0 15 3
A carrier_child_fileappend() 0 22 3
A ndk_metadata() 0 11 3
A unicode_filter() 0 8 1
A carrier_swver_get() 0 10 2
B sr_lookup_bootstrap() 0 30 5
A runtime_metadata() 0 10 1
A loader_page_scraper_og() 0 13 2
A clean_availability() 0 14 3
A devalpha_urls_bulk() 0 23 3
A dev_dupe_dicter() 0 11 2

How to fix   Complexity   

Complexity

Complex classes like bbarchivist.networkutils often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python3
0 ignored issues
show
coding-style introduced by
Too many lines in module (1668/1000)
Loading history...
2 5
"""This module is used for network connections; APIs, downloading, etc."""
3
4 5
import base64  # encoding
5 5
import binascii  # encoding
6 5
import concurrent.futures  # multiprocessing/threading
7 5
import glob  # pem file lookup
8 5
import hashlib  # salt
9 5
import os  # filesystem read
10 5
import random  # salt
11 5
import re  # regexes
12 5
import time  # salt
13 5
import zlib  # encoding
14
15 5
import requests  # downloading
16 5
from bs4 import BeautifulSoup  # scraping
17 5
from bbarchivist import utilities  # parse filesize
18 5
from bbarchivist.bbconstants import SERVERS, TCLMASTERS  # lookup servers
19
20 5
try:
21 5
    from defusedxml import ElementTree  # safer XML parsing
22 1
except (ImportError, AttributeError):
23 1
    from xml.etree import ElementTree  # XML parsing
24
25 5
__author__ = "Thurask"
26 5
__license__ = "WTFPL v2"
27 5
__copyright__ = "2015-2018 Thurask"
28
29
30 5
def grab_pem():
31
    """
32
    Work with either local cacerts or system cacerts.
33
    """
34 5
    try:
35 5
        pemfile = glob.glob(os.path.join(os.getcwd(), "cacert.pem"))[0]
36 5
    except IndexError:
37 5
        return requests.certs.where()  # no local cacerts
38
    else:
39 5
        return os.path.abspath(pemfile)  # local cacerts
40
41
42 5
def pem_wrapper(method):
43
    """
44
    Decorator to set REQUESTS_CA_BUNDLE.
45
46
    :param method: Method to use.
47
    :type method: function
48
    """
49 5
    def wrapper(*args, **kwargs):
50
        """
51
        Set REQUESTS_CA_BUNDLE before doing function.
52
        """
53 5
        os.environ["REQUESTS_CA_BUNDLE"] = grab_pem()
54 5
        return method(*args, **kwargs)
55 5
    return wrapper
56
57
58 5
def try_try_again(method):
59
    """
60
    Decorator to absorb timeouts, proxy errors, and other common exceptions.
61
62
    :param method: Method to use.
63
    :type method: function
64
    """
65 5
    def wrapper(*args, **kwargs):
66
        """
67
        Try function, try it again up to five times, and leave gracefully.
68
        """
69 5
        tries = 5
70 5
        for _ in range(tries):
71 5
            try:
72 5
                result = method(*args, **kwargs)
73 5
            except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, requests.exceptions.ProxyError):
74 5
                continue
75
            else:
76 5
                break
77
        else:
78 5
            result = None
79 5
        return result
80 5
    return wrapper
81
82
83 5
def generic_session(session=None):
84
    """
85
    Create a Requests session object on the fly, if need be.
86
87
    :param session: Requests session object, created if this is None.
88
    :type session: requests.Session()
89
    """
90 5
    sess = requests.Session() if session is None else session
91 5
    return sess
92
93
94 5
def generic_soup_parser(url, session=None):
95
    """
96
    Get a BeautifulSoup HTML parser for some URL.
97
98
    :param url: The URL to check.
99
    :type url: str
100
101
    :param session: Requests session object, default is created on the fly.
102
    :type session: requests.Session()
103
    """
104 5
    session = generic_session(session)
105 5
    req = session.get(url)
106 5
    soup = BeautifulSoup(req.content, "html.parser")
107 5
    return soup
108
109
110 5
@pem_wrapper
111 5
def get_length(url, session=None):
112
    """
113
    Get content-length header from some URL.
114
115
    :param url: The URL to check.
116
    :type url: str
117
118
    :param session: Requests session object, default is created on the fly.
119
    :type session: requests.Session()
120
    """
121 5
    session = generic_session(session)
122 5
    if url is None:
123 5
        return 0
124 5
    try:
125 5
        heads = session.head(url)
126 5
        fsize = heads.headers['content-length']
127 5
        return int(fsize)
128 5
    except requests.ConnectionError:
129 5
        return 0
130
131
132 5
@pem_wrapper
133 5
def download(url, output_directory=None, session=None):
134
    """
135
    Download file from given URL.
136
137
    :param url: URL to download from.
138
    :type url: str
139
140
    :param output_directory: Download folder. Default is local.
141
    :type output_directory: str
142
143
    :param session: Requests session object, default is created on the fly.
144
    :type session: requests.Session()
145
    """
146 5
    session = generic_session(session)
147 5
    output_directory = utilities.dirhandler(output_directory, os.getcwd())
148 5
    lfname = url.split('/')[-1]
149 5
    sname = utilities.stripper(lfname)
150 5
    fname = os.path.join(output_directory, lfname)
151 5
    download_writer(url, fname, lfname, sname, session)
152 5
    remove_empty_download(fname)
153
154
155 5
def remove_empty_download(fname):
156
    """
157
    Remove file if it's empty.
158
159
    :param fname: File path.
160
    :type fname: str
161
    """
162 5
    if os.stat(fname).st_size == 0:
163 5
        os.remove(fname)
164
165
166 5
def download_writer(url, fname, lfname, sname, session=None):
167
    """
168
    Download file and write to disk.
169
170
    :param url: URL to download from.
171
    :type url: str
172
173
    :param fname: File path.
174
    :type fname: str
175
176
    :param lfname: Long filename.
177
    :type lfname: str
178
179
    :param sname: Short name, for printing to screen.
180
    :type sname: str
181
182
    :param session: Requests session object, default is created on the fly.
183
    :type session: requests.Session()
184
    """
185 5
    with open(fname, "wb") as file:
186 5
        req = session.get(url, stream=True)
187 5
        clength = req.headers['content-length']
188 5
        fsize = utilities.fsizer(clength)
189 5
        if req.status_code == 200:  # 200 OK
190 5
            print("DOWNLOADING {0} [{1}]".format(sname, fsize))
191 5
            for chunk in req.iter_content(chunk_size=1024):
192 5
                file.write(chunk)
193
        else:
194 5
            print("ERROR: HTTP {0} IN {1}".format(req.status_code, lfname))
195
196
197 5
def download_bootstrap(urls, outdir=None, workers=5, session=None):
198
    """
199
    Run downloaders for each file in given URL iterable.
200
201
    :param urls: URLs to download.
202
    :type urls: list
203
204
    :param outdir: Download folder. Default is handled in :func:`download`.
205
    :type outdir: str
206
207
    :param workers: Number of worker processes. Default is 5.
208
    :type workers: int
209
210
    :param session: Requests session object, default is created on the fly.
211
    :type session: requests.Session()
212
    """
213 5
    workers = len(urls) if len(urls) < workers else workers
214 5
    spinman = utilities.SpinManager()
215 5
    with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as xec:
216 5
        try:
217 5
            spinman.start()
218 5
            for url in urls:
219 5
                xec.submit(download, url, outdir, session)
220 5
        except (KeyboardInterrupt, SystemExit):
221 5
            xec.shutdown()
222 5
            spinman.stop()
223 5
    spinman.stop()
224 5
    utilities.spinner_clear()
225 5
    utilities.line_begin()
226
227
228 5
def download_android_tools(downloaddir=None):
229
    """
230
    Download Android SDK platform tools.
231
232
    :param downloaddir: Directory name, default is "plattools".
233
    :type downloaddir: str
234
    """
235 5
    if downloaddir is None:
236 5
        downloaddir = "plattools"
237 5
    if os.path.exists(downloaddir):
238 5
        os.removedirs(downloaddir)
239 5
    os.mkdir(downloaddir)
240 5
    platforms = ("windows", "linux", "darwin")
241 5
    baseurl = "https://dl.google.com/android/repository/platform-tools-latest"
242 5
    dlurls = ["{1}-{0}.zip".format(plat, baseurl) for plat in platforms]
243 5
    sess = generic_session()
244
    download_bootstrap(dlurls, outdir="plattools", session=sess)
245
246 5
247 5
@pem_wrapper
248
def getcode(url, session=None):
249
    """
250
    Return status code of given URL.
251
252
    :param url: URL to check.
253
    :type url: str
254
255
    :param session: Requests session object, default is created on the fly.
256
    :type session: requests.Session()
257 5
    """
258 5
    session = generic_session(session)
259 5
    try:
260 5
        shead = session.head(url)
261 5
        status = int(shead.status_code)
262 5
        return status
263 5
    except requests.ConnectionError:
264
        return 404
265
266 5
267 5
@pem_wrapper
268
def availability(url, session=None):
269
    """
270
    Check HTTP status code of given URL.
271
    200 or 301-308 is OK, else is not.
272
273
    :param url: URL to check.
274
    :type url: str
275
276
    :param session: Requests session object, default is created on the fly.
277
    :type session: requests.Session()
278 5
    """
279 5
    status = getcode(url, session)
280
    return status == 200 or 300 < status <= 308
281
282 5
283
def clean_availability(results, server):
284
    """
285
    Clean availability for autolookup script.
286
287
    :param results: Result dict.
288
    :type results: dict(str: str)
289
290
    :param server: Server, key for result dict.
291
    :type server: str
292 5
    """
293 5
    marker = "PD" if server == "p" else server.upper()
294 5
    rel = results[server.lower()]
295 5
    avail = marker if rel != "SR not in system" and rel is not None else "  "
296
    return rel, avail
297
298 5
299
def tcl_master():
300
    """
301
    Get a random master server.
302 5
    """
303
    return random.choice(TCLMASTERS)
304
305 5
306
def tcl_default_id(devid):
307
    """
308
    Get an IMEI or a serial number or something.
309
310
    :param devid: Return default if this is None.
311
    :type devid: str
312 5
    """
313 5
    if devid is None:
314 5
        devid = "543212345000000"
315
    return devid
316
317 5
318
def check_prep(curef, mode=4, fvver="AAA000", cltp=2010, cktp=2, rtd=1, chnl=2, devid=None):
1 ignored issue
show
best-practice introduced by
Too many arguments (8/5)
Loading history...
319
    """
320
    Prepare variables for TCL update check.
321
322
    :param curef: PRD of the phone variant to check.
323
    :type curef: str
324
325
    :param mode: 4 if downloading autoloaders, 2 if downloading OTA deltas.
326
    :type mode: int
327
328
    :param fvver: Initial software version, must be specific if downloading OTA deltas.
329
    :type fvver: str
330
331
    :param cltp: 2010 to always show latest version, 10 to show actual updates. Default is 2010.
332
    :type cltp: int
333
334
    :param cktp: 2 if checking manually, 1 if checking automatically. Default is 2.
335
    :type cktp: int
336
337
    :param rtd: 2 if rooted, 1 if not. Default is 1.
338
    :type rtd: int
339
340
    :param chnl: 2 if checking on WiFi, 1 if checking on mobile. Default is 2.
341
    :type chnl: int
342
343
    :param devid: Serial number/IMEI. Default is fake, not that it matters.
344
    :type devid: str
345 5
    """
346 5
    devid = tcl_default_id(devid)
347 5
    geturl = "http://{0}/check.php".format(tcl_master())
348 5
    params = {"id": devid, "curef": curef, "fv": fvver, "mode": mode, "type": "Firmware", "cltp": cltp, "cktp": cktp, "rtd": rtd, "chnl": chnl}
349
    return geturl, params
350
351 5
352 5
@pem_wrapper
353 5
@try_try_again
354
def tcl_check(curef, session=None, mode=4, fvver="AAA000", export=False):
355
    """
356
    Check TCL server for updates.
357
358
    :param curef: PRD of the phone variant to check.
359
    :type curef: str
360
361
    :param session: Requests session object, default is created on the fly.
362
    :type session: requests.Session()
363
364
    :param mode: 4 if downloading autoloaders, 2 if downloading OTA deltas.
365
    :type mode: int
366
367
    :param fvver: Initial software version, must be specific if downloading OTA deltas.
368
    :type fvver: str
369
370
    :param export: Whether to export XML response to file. Default is False.
371
    :type export: bool
372 5
    """
373 5
    sess = generic_session(session)
374 5
    geturl, params = check_prep(curef, mode, fvver)
375 5
    req = sess.get(geturl, params=params)
376 5
    if req.status_code == 200:
377 5
        req.encoding = "utf-8"
378 5
        response = req.text
379 5
        if export:
380
            dump_tcl_xml(response)
381 5
    else:
382 5
        response = None
383
    return response
384
385 5
386
def parse_tcl_check(data):
387
    """
388
    Extract version and file info from TCL update server response.
389
390
    :param data: The data to parse.
391
    :type data: str
392 5
    """
393 5
    root = ElementTree.fromstring(data)
394 5
    tvver = root.find("VERSION").find("TV").text
395 5
    fwid = root.find("FIRMWARE").find("FW_ID").text
396 5
    fileinfo = root.find("FIRMWARE").find("FILESET").find("FILE")
397 5
    filename = fileinfo.find("FILENAME").text
398 5
    filesize = fileinfo.find("SIZE").text
399 5
    filehash = fileinfo.find("CHECKSUM").text
400
    return tvver, fwid, filename, filesize, filehash
401
402 5
403
def tcl_salt():
404
    """
405
    Generate salt value for TCL server tools.
406 5
    """
407 5
    millis = round(time.time() * 1000)
408 5
    tail = "{0:06d}".format(random.randint(0, 999999))
409
    return "{0}{1}".format(str(millis), tail)
410
411 5
412
def dump_tcl_xml(xmldata):
413
    """
414
    Write XML responses to output directory.
415
416
    :param xmldata: Response XML.
417
    :type xmldata: str
418 5
    """
419 5
    outfile = os.path.join(os.getcwd(), "logs", "{0}.xml".format(tcl_salt()))
420 5
    if not os.path.exists(os.path.dirname(outfile)):
421 5
        os.makedirs(os.path.dirname(outfile))
422 5
    with open(outfile, "w", encoding="utf-8") as afile:
423
        afile.write(xmldata)
424
425 5
426
def unpack_vdkey():
427
    """
428
    Draw the curtain back.
429 5
    """
430 5
    vdkey = b"eJwdjwEOwDAIAr8kKFr//7HhmqXp8AIIDrYAgg8byiUXrwRJRXja+d6iNxu0AhUooDCN9rd6rDLxmGIakUVWo3IGCTRWqCAt6X4jGEIUAxgN0eYWnp+LkpHQAg/PsO90ELsy0Npm/n2HbtPndFgGEV31R9OmT4O4nrddjc3Qt6nWscx7e+WRHq5UnOudtjw5skuV09pFhvmqnOEIs4ljPeel1wfLYUF4\n"
431 5
    vdk = zlib.decompress(binascii.a2b_base64(vdkey))
432
    return vdk.decode("utf-8")
433
434 5
435
def vkhash(curef, tvver, fwid, salt, mode=4, fvver="AAA000", cltp=2010, devid=None):
1 ignored issue
show
best-practice introduced by
Too many arguments (8/5)
Loading history...
436
    """
437
    Generate hash from TCL update server variables.
438
439
    :param curef: PRD of the phone variant to check.
440
    :type curef: str
441
442
    :param tvver: Target software version.
443
    :type tvver: str
444
445
    :param fwid: Firmware ID for desired download file.
446
    :type fwid: str
447
448
    :param salt: Salt hash.
449
    :type salt: str
450
451
    :param mode: 4 if downloading autoloaders, 2 if downloading OTA deltas.
452
    :type mode: int
453
454
    :param fvver: Initial software version, must be specific if downloading OTA deltas.
455
    :type fvver: str
456
457
    :param cltp: 2010 to always show latest version, 10 to show actual updates. Default is 2010.
458
    :type cltp: int
459
460
    :param devid: Serial number/IMEI. Default is fake, not that it matters.
461
    :type devid: str
462 5
    """
463 5
    vdk = unpack_vdkey()
464 5
    devid = tcl_default_id(devid)
465 5
    query = "id={0}&salt={1}&curef={2}&fv={3}&tv={4}&type={5}&fw_id={6}&mode={7}&cltp={8}{9}".format(devid, salt, curef, fvver, tvver, "Firmware", fwid, mode, cltp, vdk)
466 5
    engine = hashlib.sha1()
467 5
    engine.update(bytes(query, "utf-8"))
468
    return engine.hexdigest()
469
470 5
471
def download_request_prep(curef, tvver, fwid, salt, vkh, mode=4, fvver="AAA000", cltp=2010, devid=None):
1 ignored issue
show
best-practice introduced by
Too many arguments (9/5)
Loading history...
472
    """
473
    Prepare variables for download server check.
474
475
    :param curef: PRD of the phone variant to check.
476
    :type curef: str
477
478
    :param tvver: Target software version.
479
    :type tvver: str
480
481
    :param fwid: Firmware ID for desired download file.
482
    :type fwid: str
483
484
    :param salt: Salt hash.
485
    :type salt: str
486
487
    :param vkh: VDKey-based hash.
488
    :type vkh: str
489
490
    :param mode: 4 if downloading autoloaders, 2 if downloading OTA deltas.
491
    :type mode: int
492
493
    :param fvver: Initial software version, must be specific if downloading OTA deltas.
494
    :type fvver: str
495
496
    :param cltp: 2010 to always show latest version, 10 to show actual updates. Default is 2010.
497
    :type cltp: int
498
499
    :param devid: Serial number/IMEI. Default is fake, not that it matters.
500
    :type devid: str
501 5
    """
502 5
    devid = tcl_default_id(devid)
503 5
    posturl = "http://{0}/download_request.php".format(tcl_master())
504 5
    params = {"id": devid, "curef": curef, "fv": fvver, "mode": mode, "type": "Firmware", "tv": tvver, "fw_id": fwid, "salt": salt, "vk": vkh, "cltp": cltp}
505 5
    if mode == 4:
506 5
        params["foot"] = 1
507
    return posturl, params
508
509 5
510 5
@pem_wrapper
511 5
@try_try_again
512
def tcl_download_request(curef, tvver, fwid, salt, vkh, session=None, mode=4, fvver="AAA000", export=False):
1 ignored issue
show
best-practice introduced by
Too many arguments (9/5)
Loading history...
513
    """
514
    Check TCL server for download URLs.
515
516
    :param curef: PRD of the phone variant to check.
517
    :type curef: str
518
519
    :param tvver: Target software version.
520
    :type tvver: str
521
522
    :param fwid: Firmware ID for desired download file.
523
    :type fwid: str
524
525
    :param salt: Salt hash.
526
    :type salt: str
527
528
    :param vkh: VDKey-based hash.
529
    :type vkh: str
530
531
    :param session: Requests session object, default is created on the fly.
532
    :type session: requests.Session()
533
534
    :param mode: 4 if downloading autoloaders, 2 if downloading OTA deltas.
535
    :type mode: int
536
537
    :param fvver: Initial software version, must be specific if downloading OTA deltas.
538
    :type fvver: str
539
540
    :param export: Whether to export XML response to file. Default is False.
541
    :type export: bool
542 5
    """
543 5
    sess = generic_session(session)
544 5
    posturl, params = download_request_prep(curef, tvver, fwid, salt, vkh, mode, fvver)
545 5
    req = sess.post(posturl, data=params)
546 5
    if req.status_code == 200:
547 5
        req.encoding = "utf-8"
548 5
        response = req.text
549 5
        if export:
550
            dump_tcl_xml(response)
551 5
    else:
552 5
        response = None
553
    return response
554
555 5
556
def parse_tcl_download_request(body, mode=4):
557
    """
558
    Extract file URL and encrypt slave URL from TCL update server response.
559
560
    :param data: The data to parse.
561
    :type data: str
562
563
    :param mode: 4 if downloading autoloaders, 2 if downloading OTA deltas.
564
    :type mode: int
565 5
    """
566 5
    root = ElementTree.fromstring(body)
567 5
    slavelist = root.find("SLAVE_LIST").findall("SLAVE")
568 5
    slave = random.choice(slavelist).text
569 5
    dlurl = root.find("FILE_LIST").find("FILE").find("DOWNLOAD_URL").text
570 5
    eslave = root.find("SLAVE_LIST").findall("ENCRYPT_SLAVE")
571 5
    encslave = None if mode == 2 or not eslave else random.choice(eslave).text
572
    return "http://{0}{1}".format(slave, dlurl), encslave
573
574 5
575
def encrypt_header_prep(address, encslave):
576
    """
577
    Prepare variables for encrypted header check.
578
579
    :param address: File URL minus host.
580
    :type address: str
581
582
    :param encslave: Server hosting header script.
583
    :type encslave: str
584 5
    """
585 5
    encs = {b"YWNjb3VudA==" : b"emhlbmdodWEuZ2Fv", b"cGFzc3dvcmQ=": b"cWFydUQ0b2s="}
586 5
    params = {base64.b64decode(key): base64.b64decode(val) for key, val in encs.items()}
587 5
    params[b"address"] = bytes(address, "utf-8")
588 5
    posturl = "http://{0}/encrypt_header.php".format(encslave)
589
    return posturl, params
590
591 5
592 5
@pem_wrapper
593
def encrypt_header(address, encslave, session=None):
594
    """
595
    Check encrypted header.
596
597
    :param address: File URL minus host.
598
    :type address: str
599
600
    :param encslave: Server hosting header script.
601
    :type encslave: str
602
603
    :param session: Requests session object, default is created on the fly.
604
    :type session: requests.Session()
605 5
    """
606 5
    sess = generic_session(session)
607 5
    posturl, params = encrypt_header_prep(address, encslave)
608 5
    req = sess.post(posturl, data=params)
609 5
    if req.status_code == 206:  # partial
610 5
        contentlength = int(req.headers["Content-Length"])
611 5
        sentinel = "HEADER FOUND" if contentlength == 4194320 else "NO HEADER FOUND"
612
    else:
613 5
        sentinel = None
614
    return sentinel
615
616 5
617
@pem_wrapper
618
def remote_prd_info():
619
    """
620
    Get list of remote OTA versions.
621 5
    """
622 5
    dburl = "https://tclota.birth-online.de/json_lastupdates.php"
623 5
    req = requests.get(dburl)
624 5
    reqj = req.json()
625 5
    otadict = {val["curef"]: val["last_ota"] for val in reqj.values() if val["last_ota"] is not None}
626
    return otadict
627
628 5
629
def cchecker_get_tags(root):
630
    """
631
    Get country and carrier from XML.
632
633
    :param root: ElementTree we're barking up.
634
    :type root: xml.etree.ElementTree.ElementTree
635 5
    """
636 5
    for child in root:
637 5
        if child.tag == "country":
638 5
            country = child.get("name")
639 5
        if child.tag == "carrier":
640 5
            carrier = child.get("name")
641
    return country, carrier
2 ignored issues
show
introduced by
The variable country does not seem to be defined for all execution paths.
Loading history...
introduced by
The variable carrier does not seem to be defined for all execution paths.
Loading history...
642
643 5
644 5
@pem_wrapper
645
def carrier_checker(mcc, mnc, session=None):
646
    """
647
    Query BlackBerry World to map a MCC and a MNC to a country and carrier.
648
649
    :param mcc: Country code.
650
    :type mcc: int
651
652
    :param mnc: Network code.
653
    :type mnc: int
654
655
    :param session: Requests session object, default is created on the fly.
656
    :type session: requests.Session()
657 5
    """
658 5
    session = generic_session(session)
659 5
    baseurl = "http://appworld.blackberry.com/ClientAPI/checkcarrier"
660 5
    url = "{2}?homemcc={0}&homemnc={1}&devicevendorid=-1&pin=0".format(mcc, mnc, baseurl)
661 5
    user_agent = {'User-agent': 'AppWorld/5.1.0.60'}
662 5
    req = session.get(url, headers=user_agent)
663 5
    root = ElementTree.fromstring(req.text)
664
    country, carrier = cchecker_get_tags(root)
665
    return country, carrier
666 5
667
668
def return_npc(mcc, mnc):
669
    """
670
    Format MCC and MNC into a NPC.
671
672
    :param mcc: Country code.
673
    :type mcc: int
674
675
    :param mnc: Network code.
676 5
    :type mnc: int
677
    """
678
    return "{0}{1}30".format(str(mcc).zfill(3), str(mnc).zfill(3))
679 5
680 5
681
@pem_wrapper
682
def carrier_query(npc, device, upgrade=False, blitz=False, forced=None, session=None):
1 ignored issue
show
best-practice introduced by
Too many arguments (6/5)
Loading history...
683
    """
684
    Query BlackBerry servers, check which update is out for a carrier.
685
686
    :param npc: MCC + MNC (see `func:return_npc`)
687
    :type npc: int
688
689
    :param device: Hexadecimal hardware ID.
690
    :type device: str
691
692
    :param upgrade: Whether to use upgrade files. False by default.
693
    :type upgrade: bool
694
695
    :param blitz: Whether or not to create a blitz package. False by default.
696
    :type blitz: bool
697
698
    :param forced: Force a software release.
699
    :type forced: str
700
701
    :param session: Requests session object, default is created on the fly.
702 5
    :type session: requests.Session()
703 5
    """
704 5
    session = generic_session(session)
705 5
    upg = "upgrade" if upgrade else "repair"
706 5
    forced = "latest" if forced is None else forced
707 5
    url = "https://cs.sl.blackberry.com/cse/updateDetails/2.2/"
708 5
    query = '<?xml version="1.0" encoding="UTF-8"?>'
709 5
    query += '<updateDetailRequest version="2.2.1" authEchoTS="1366644680359">'
710 5
    query += "<clientProperties>"
711 5
    query += "<hardware>"
712 5
    query += "<pin>0x2FFFFFB3</pin><bsn>1128121361</bsn>"
713 5
    query += "<imei>004401139269240</imei>"
714 5
    query += "<id>0x{0}</id>".format(device)
715 5
    query += "</hardware>"
716 5
    query += "<network>"
717 5
    query += "<homeNPC>0x{0}</homeNPC>".format(npc)
718 5
    query += "<iccid>89014104255505565333</iccid>"
719 5
    query += "</network>"
720 5
    query += "<software>"
721 5
    query += "<currentLocale>en_US</currentLocale>"
722 5
    query += "<legalLocale>en_US</legalLocale>"
723 5
    query += "</software>"
724 5
    query += "</clientProperties>"
725 5
    query += "<updateDirectives>"
726 5
    query += '<allowPatching type="REDBEND">true</allowPatching>'
727 5
    query += "<upgradeMode>{0}</upgradeMode>".format(upg)
728 5
    query += "<provideDescriptions>false</provideDescriptions>"
729 5
    query += "<provideFiles>true</provideFiles>"
730 5
    query += "<queryType>NOTIFICATION_CHECK</queryType>"
731 5
    query += "</updateDirectives>"
732 5
    query += "<pollType>manual</pollType>"
733 5
    query += "<resultPackageSetCriteria>"
734 5
    query += '<softwareRelease softwareReleaseVersion="{0}" />'.format(forced)
735 5
    query += "<releaseIndependent>"
736 5
    query += '<packageType operation="include">application</packageType>'
737 5
    query += "</releaseIndependent>"
738 5
    query += "</resultPackageSetCriteria>"
739 5
    query += "</updateDetailRequest>"
740 5
    header = {"Content-Type": "text/xml;charset=UTF-8"}
741
    req = session.post(url, headers=header, data=query)
742
    return parse_carrier_xml(req.text, blitz)
743 5
744
745
def carrier_swver_get(root):
746
    """
747
    Get software release from carrier XML.
748
749
    :param root: ElementTree we're barking up.
750 5
    :type root: xml.etree.ElementTree.ElementTree
751 5
    """
752 5
    for child in root.iter("softwareReleaseMetadata"):
753
        swver = child.get("softwareReleaseVersion")
754
    return swver
1 ignored issue
show
introduced by
The variable swver does not seem to be defined in case the for loop on line 752 is not entered. Are you sure this can never be the case?
Loading history...
755 5
756
757
def carrier_child_fileappend(child, files, baseurl, blitz=False):
758
    """
759
    Append bar file links to a list from a child element.
760
761
    :param child: Child element in use.
762
    :type child: xml.etree.ElementTree.Element
763
764
    :param files: Filelist.
765
    :type files: list(str)
766
767
    :param baseurl: Base URL, URL minus the filename.
768
    :type baseurl: str
769
770
    :param blitz: Whether or not to create a blitz package. False by default.
771 5
    :type blitz: bool
772 5
    """
773
    if not blitz:
774 5
        files.append(baseurl + child.get("path"))
775 5
    else:
776 5
        if child.get("type") not in ["system:radio", "system:desktop", "system:os"]:
777
            files.append(baseurl + child.get("path"))
778
    return files
779 5
780
781
def carrier_child_finder(root, files, baseurl, blitz=False):
782
    """
783
    Extract filenames, radio and OS from child elements.
784
785
    :param root: ElementTree we're barking up.
786
    :type root: xml.etree.ElementTree.ElementTree
787
788
    :param files: Filelist.
789
    :type files: list(str)
790
791
    :param baseurl: Base URL, URL minus the filename.
792
    :type baseurl: str
793
794
    :param blitz: Whether or not to create a blitz package. False by default.
795 5
    :type blitz: bool
796 5
    """
797 5
    osver = radver = ""
798 5
    for child in root.iter("package"):
799 5
        files = carrier_child_fileappend(child, files, baseurl, blitz)
800 5
        if child.get("type") == "system:radio":
801 5
            radver = child.get("version")
802 5
        elif child.get("type") == "system:desktop":
803 5
            osver = child.get("version")
804 5
        elif child.get("type") == "system:os":
805
            osver = child.get("version")
806
    return osver, radver, files
807 5
808
809
def parse_carrier_xml(data, blitz=False):
810
    """
811
    Parse the response to a carrier update request and return the juicy bits.
812
813
    :param data: The data to parse.
814
    :type data: xml
815
816
    :param blitz: Whether or not to create a blitz package. False by default.
817 5
    :type blitz: bool
818 5
    """
819 5
    root = ElementTree.fromstring(data)
820 5
    sw_exists = root.find('./data/content/softwareReleaseMetadata')
821 5
    swver = "N/A" if sw_exists is None else ""
822 5
    if sw_exists is not None:
823 5
        swver = carrier_swver_get(root)
824 5
    files = []
825 5
    package_exists = root.find('./data/content/fileSets/fileSet')
826 5
    osver = radver = ""
827 5
    if package_exists is not None:
828 5
        baseurl = "{0}/".format(package_exists.get("url"))
829
        osver, radver, files = carrier_child_finder(root, files, baseurl, blitz)
830
    return(swver, osver, radver, files)
831 5
832 5
833
@pem_wrapper
834
def sr_lookup(osver, server, session=None):
835
    """
836
    Software release lookup, with choice of server.
837
    :data:`bbarchivist.bbconstants.SERVERLIST` for server list.
838
839
    :param osver: OS version to lookup, 10.x.y.zzzz.
840
    :type osver: str
841
842
    :param server: Server to use.
843
    :type server: str
844
845
    :param session: Requests session object, default is created on the fly.
846 5
    :type session: requests.Session()
847 5
    """
848 5
    query = '<?xml version="1.0" encoding="UTF-8"?>'
849 5
    query += '<srVersionLookupRequest version="2.0.0"'
850 5
    query += ' authEchoTS="1366644680359">'
851 5
    query += '<clientProperties><hardware>'
852 5
    query += '<pin>0x2FFFFFB3</pin><bsn>1140011878</bsn>'
853 5
    query += '<imei>004402242176786</imei><id>0x8D00240A</id>'
854 5
    query += '<isBootROMSecure>true</isBootROMSecure>'
855 5
    query += '</hardware>'
856 5
    query += '<network>'
857 5
    query += '<vendorId>0x0</vendorId><homeNPC>0x60</homeNPC>'
858 5
    query += '<currentNPC>0x60</currentNPC><ecid>0x1</ecid>'
859 5
    query += '</network>'
860 5
    query += '<software><currentLocale>en_US</currentLocale>'
861 5
    query += '<legalLocale>en_US</legalLocale>'
862 5
    query += '<osVersion>{0}</osVersion>'.format(osver)
863 5
    query += '<omadmEnabled>false</omadmEnabled>'
864 5
    query += '</software></clientProperties>'
865 5
    query += '</srVersionLookupRequest>'
866 5
    reqtext = sr_lookup_poster(query, server, session)
867
    packtext = sr_lookup_xmlparser(reqtext)
868
    return packtext
869 5
870
871
def sr_lookup_poster(query, server, session=None):
872
    """
873
    Post the XML payload for a software release lookup.
874
875
    :param query: XML payload.
876
    :type query: str
877
878
    :param server: Server to use.
879
    :type server: str
880
881
    :param session: Requests session object, default is created on the fly.
882 5
    :type session: requests.Session()
883 5
    """
884 5
    session = generic_session(session)
885 5
    header = {"Content-Type": "text/xml;charset=UTF-8"}
886 5
    try:
887 5
        req = session.post(server, headers=header, data=query, timeout=1)
888
    except (requests.exceptions.Timeout, requests.exceptions.ConnectionError):
889 5
        reqtext = "SR not in system"
890 5
    else:
891
        reqtext = req.text
892
    return reqtext
893 5
894
895
def sr_lookup_xmlparser(reqtext):
896
    """
897
    Take the text of a software lookup request response and parse it as XML.
898
899
    :param reqtext: Response text, hopefully XML formatted.
900 5
    :type reqtext: str
901 5
    """
902 5
    try:
903 5
        root = ElementTree.fromstring(reqtext)
904
    except ElementTree.ParseError:
905 5
        packtext = "SR not in system"
906 5
    else:
907
        packtext = sr_lookup_extractor(root)
908
    return packtext
909 5
910
911
def sr_lookup_extractor(root):
912
    """
913
    Take an ElementTree and extract a software release from it.
914
915
    :param root: ElementTree we're barking up.
916 5
    :type root: xml.etree.ElementTree.ElementTree
917 5
    """
918 5
    reg = re.compile(r"(\d{1,4}\.)(\d{1,4}\.)(\d{1,4}\.)(\d{1,4})")
919 5
    packages = root.findall('./data/content/')
920 5
    for package in packages:
921 5
        if package.text is not None:
922 5
            match = reg.match(package.text)
923
            packtext = package.text if match else "SR not in system"
924
            return packtext
925 5
926
927
def sr_lookup_bootstrap(osv, session=None, no2=False):
928
    """
929
    Run lookups for each server for given OS.
930
931
    :param osv: OS to check.
932
    :type osv: str
933
934
    :param session: Requests session object, default is created on the fly.
935
    :type session: requests.Session()
936
937
    :param no2: Whether to skip Alpha2/Beta2 servers. Default is false.
938 5
    :type no2: bool
939 5
    """
940 5
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as xec:
941
        try:
942
            results = {
943
                "p": None,
944
                "a1": None,
945
                "a2": None,
946
                "b1": None,
947 5
                "b2": None
948 5
            }
949 5
            if no2:
950 5
                del results["a2"]
951 5
                del results["b2"]
952 5
            for key in results:
953 5
                results[key] = xec.submit(sr_lookup, osv, SERVERS[key], session).result()
954 5
            return results
955
        except KeyboardInterrupt:
956
            xec.shutdown(wait=False)
957 5
958 5
959
@pem_wrapper
960
def available_bundle_lookup(mcc, mnc, device, session=None):
961
    """
962
    Check which software releases were ever released for a carrier.
963
964
    :param mcc: Country code.
965
    :type mcc: int
966
967
    :param mnc: Network code.
968
    :type mnc: int
969
970
    :param device: Hexadecimal hardware ID.
971
    :type device: str
972
973
    :param session: Requests session object, default is created on the fly.
974 5
    :type session: requests.Session()
975 5
    """
976 5
    session = generic_session(session)
977 5
    server = "https://cs.sl.blackberry.com/cse/availableBundles/1.0.0/"
978 5
    npc = return_npc(mcc, mnc)
979 5
    query = '<?xml version="1.0" encoding="UTF-8"?>'
980 5
    query += '<availableBundlesRequest version="1.0.0" '
981 5
    query += 'authEchoTS="1366644680359">'
982 5
    query += '<deviceId><pin>0x2FFFFFB3</pin></deviceId>'
983 5
    query += '<clientProperties><hardware><id>0x{0}</id>'.format(device)
984 5
    query += '<isBootROMSecure>true</isBootROMSecure></hardware>'
985 5
    query += '<network><vendorId>0x0</vendorId><homeNPC>0x{0}</homeNPC>'.format(npc)
986 5
    query += '<currentNPC>0x{0}</currentNPC></network><software>'.format(npc)
987 5
    query += '<currentLocale>en_US</currentLocale>'
988 5
    query += '<legalLocale>en_US</legalLocale>'
989 5
    query += '<osVersion>10.0.0.0</osVersion>'
990 5
    query += '<radioVersion>10.0.0.0</radioVersion></software>'
991 5
    query += '</clientProperties><updateDirectives><bundleVersionFilter>'
992 5
    query += '</bundleVersionFilter></updateDirectives>'
993 5
    query += '</availableBundlesRequest>'
994 5
    header = {"Content-Type": "text/xml;charset=UTF-8"}
995 5
    req = session.post(server, headers=header, data=query)
996 5
    root = ElementTree.fromstring(req.text)
997 5
    package = root.find('./data/content')
998
    bundlelist = [child.attrib["version"] for child in package]
999
    return bundlelist
1000 5
1001 5
1002
@pem_wrapper
1003
def ptcrb_scraper(ptcrbid, session=None):
1004
    """
1005
    Get the PTCRB results for a given device.
1006
1007
    :param ptcrbid: Numerical ID from PTCRB (end of URL).
1008
    :type ptcrbid: str
1009
1010
    :param session: Requests session object, default is created on the fly.
1011 5
    :type session: requests.Session()
1012 5
    """
1013 5
    baseurl = "https://www.ptcrb.com/certified-devices/device-details/?model={0}".format(ptcrbid)
1014 5
    sess = generic_session(session)
1015 5
    useragent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.79 Safari/537.36"
1016 5
    sess.headers.update({"User-agent": useragent})
1017 5
    soup = generic_soup_parser(baseurl, sess)
1018 5
    certtable = soup.find_all("table")[1]
1019 5
    tds = certtable.find_all("td")[1::2]  # every other
1020
    prelimlist = [tdx.text for tdx in tds]
1021
    cleanlist = [ptcrb_item_cleaner(item.strip()) for item in prelimlist]
1022 5
    return cleanlist
1023
1024
1025
def space_pad(instring, minlength):
1026
    """
1027
    Pad a string with spaces until it's the minimum length.
1028
1029
    :param instring: String to pad.
1030
    :type instring: str
1031
1032 5
    :param minlength: Pad while len(instring) < minlength.
1033 5
    :type minlength: int
1034 5
    """
1035
    while len(instring) < minlength:
1036
        instring += " "
1037 5
    return instring
1038
1039
1040
def ptcrb_cleaner_multios(item):
1041
    """
1042
    Discard multiple entries for "OS".
1043
1044 5
    :param item: The item to clean.
1045 5
    :type item: str
1046 5
    """
1047 5
    if item.count("OS") > 1:
1048 5
        templist = item.split("OS")
1049
        templist[0] = "OS"
1050
        item = "".join([templist[0], templist[1]])
1051 5
    return item
1052
1053
1054
def ptcrb_cleaner_spaces(item):
1055
    """
1056
    Pad item with spaces to the right length.
1057
1058 5
    :param item: The item to clean.
1059 5
    :type item: str
1060 5
    """
1061 5
    spaclist = item.split(" ")
1062 5
    if len(spaclist) > 1:
1063 5
        spaclist[1] = space_pad(spaclist[1], 11)
1064 5
    if len(spaclist) > 3:
1065
        spaclist[3] = space_pad(spaclist[3], 11)
1066
    item = " ".join(spaclist)
1067 5
    return item
1068
1069
1070
def ptcrb_item_cleaner(item):
1071
    """
1072
    Cleanup poorly formatted PTCRB entries written by an intern.
1073
1074 5
    :param item: The item to clean.
1075 5
    :type item: str
1076 5
    """
1077 5
    item = item.replace("<td>", "")
1078 5
    item = item.replace("</td>", "")
1079 5
    item = item.replace("\n", "")
1080 5
    item = item.replace("SW: OS", "OS")
1081 5
    item = item.replace("Software Version: OS", "OS")
1082 5
    item = item.replace(" (SR", ", SR")
1083 5
    item = re.sub(r"\s?\((.*)$", "", item)
1084 5
    item = re.sub(r"\sSV.*$", "", item)
1085 5
    item = item.replace(")", "")
1086 5
    item = item.replace(". ", ".")
1087 5
    item = item.replace(";", "")
1088 5
    item = item.replace("version", "Version")
1089 5
    item = item.replace("Verison", "Version")
1090 5
    item = ptcrb_cleaner_multios(item)
1091 5
    item = item.replace("SR10", "SR 10")
1092 5
    item = item.replace("SR", "SW Release")
1093 5
    item = item.replace(" Version:", ":")
1094 5
    item = item.replace("Version ", " ")
1095 5
    item = item.replace(":1", ": 1")
1096 5
    item = item.replace(", ", " ")
1097 5
    item = item.replace(",", " ")
1098 5
    item = item.replace("Software", "SW")
1099 5
    item = item.replace("  ", " ")
1100 5
    item = item.replace("OS ", "OS: ")
1101 5
    item = item.replace("Radio ", "Radio: ")
1102 5
    item = item.replace("Release ", "Release: ")
1103 5
    item = ptcrb_cleaner_spaces(item)
1104 5
    item = item.strip()
1105 5
    item = item.replace("\r", "")
1106 5
    if item.startswith("10"):
1107 5
        item = "OS: {0}".format(item)
1108
    item = item.replace(":    ", ": ")
1109
    item = item.replace(":   ", ": ")
1110 5
    return item
1111 5
1112
1113
@pem_wrapper
1114
def kernel_scraper(utils=False, session=None):
1115
    """
1116
    Scrape BlackBerry's GitHub kernel repo for available branches.
1117
1118
    :param utils: Check android-utils repo instead of android-linux-kernel. Default is False.
1119
    :type utils: bool
1120
1121 5
    :param session: Requests session object, default is created on the fly.
1122 5
    :type session: requests.Session()
1123 5
    """
1124 5
    repo = "android-utils" if utils else "android-linux-kernel"
1125 5
    kernlist = []
1126 5
    sess = generic_session(session)
1127 5
    for page in range(1, 10):
1128 5
        url = "https://github.com/blackberry/{0}/branches/all?page={1}".format(repo, page)
1129
        soup = generic_soup_parser(url, sess)
1130 5
        if soup.find("div", {"class": "no-results-message"}):
1131 5
            break
1132 5
        else:
1133
            text = soup.get_text()
1134
            kernlist.extend(re.findall(r"msm[0-9]{4}\/[A-Z0-9]{6}", text, re.IGNORECASE))
1135 5
    return kernlist
1136
1137
1138
def root_generator(folder, build, variant="common"):
1139
    """
1140
    Generate roots for the SHAxxx hash lookup URLs.
1141
1142
    :param folder: Dictionary of variant: loader name pairs.
1143
    :type folder: dict(str: str)
1144
1145
    :param build: Build to check, 3 letters + 3 numbers.
1146
    :type build: str
1147
1148
    :param variant: Autoloader variant. Default is "common".
1149 5
    :type variant: str
1150
    """
1151 5
    #Priv specific
1152
    privx = "bbfoundation/hashfiles_priv/{0}".format(folder[variant])
1153 5
    #DTEK50 specific
1154
    dtek50x = "bbSupport/DTEK50" if build[:3] == "AAF" else "bbfoundation/hashfiles_priv/dtek50"
1155 5
    #DTEK60 specific
1156 5
    dtek60x = dtek50x  # still uses dtek50 folder, for some reason
1157
    #Pack it up
1158
    roots = {"Priv": privx, "DTEK50": dtek50x, "DTEK60": dtek60x}
1159 5
    return roots
1160
1161
1162
def make_droid_skeleton_bbm(method, build, device, variant="common"):
1163
    """
1164
    Make an Android autoloader/hash URL, on the BB Mobile site.
1165
1166
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
1167
    :type method: str
1168
1169
    :param build: Build to check, 3 letters + 3 numbers.
1170
    :type build: str
1171
1172
    :param device: Device to check.
1173
    :type device: str
1174
1175 5
    :param variant: Autoloader variant. Default is "common".
1176 5
    :type variant: str
1177 5
    """
1178 5
    devices = {"KEYone": "qc8953", "Motion": "qc8953"}
1179
    base = "bbry_{2}_autoloader_user-{0}-{1}".format(variant, build.upper(), devices[device])
1180 5
    if method is None:
1181 5
        skel = "http://54.247.87.13/softwareupgrade/BBM/{0}.zip".format(base)
1182
    else:
1183
        skel = "http://54.247.87.13/softwareupgrade/BBM/{0}.{1}sum".format(base, method.lower())
1184 5
    return skel
1185
1186
1187
def make_droid_skeleton_og(method, build, device, variant="common"):
1188
    """
1189
    Make an Android autoloader/hash URL, on the original site.
1190
1191
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
1192
    :type method: str
1193
1194
    :param build: Build to check, 3 letters + 3 numbers.
1195
    :type build: str
1196
1197
    :param device: Device to check.
1198
    :type device: str
1199
1200 5
    :param variant: Autoloader variant. Default is "common".
1201 5
    :type variant: str
1202 5
    """
1203 5
    folder = {"vzw-vzw": "verizon", "na-att": "att", "na-tmo": "tmo", "common": "default"}
1204 5
    devices = {"Priv": "qc8992", "DTEK50": "qc8952_64_sfi", "DTEK60": "qc8996"}
1205 5
    roots = root_generator(folder, build, variant)
1206
    base = "bbry_{2}_autoloader_user-{0}-{1}".format(variant, build.upper(), devices[device])
1207 5
    if method is None:
1208 5
        baseurl = "https://bbapps.download.blackberry.com/Priv"
1209
        skel = "{1}/{0}.zip".format(base, baseurl)
1210
    else:
1211 5
        baseurl = "https://ca.blackberry.com/content/dam"
1212
        skel = "{3}/{1}/{0}.{2}sum".format(base, roots[device], method.lower(), baseurl)
1213
    return skel
1214
1215
1216
def make_droid_skeleton(method, build, device, variant="common"):
1217
    """
1218
    Make an Android autoloader/hash URL.
1219
1220
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
1221
    :type method: str
1222
1223
    :param build: Build to check, 3 letters + 3 numbers.
1224
    :type build: str
1225
1226
    :param device: Device to check.
1227
    :type device: str
1228 5
1229 5
    :param variant: Autoloader variant. Default is "common".
1230 5
    :type variant: str
1231 5
    """
1232 5
    # No Aurora
1233 5
    oglist = ("Priv", "DTEK50", "DTEK60")  # BlackBerry
1234 5
    bbmlist = ("KEYone", "Motion")   # BB Mobile
1235
    if device in oglist:
1236
        skel = make_droid_skeleton_og(method, build, device, variant)
1237 5
    elif device in bbmlist:
1238
        skel = make_droid_skeleton_bbm(method, build, device, variant)
1239
    return skel
1 ignored issue
show
introduced by
The variable skel does not seem to be defined for all execution paths.
Loading history...
1240
1241
1242
def bulk_droid_skeletons(devs, build, method=None):
1243
    """
1244
    Prepare list of Android autoloader/hash URLs.
1245
1246
    :param devs: List of devices.
1247
    :type devs: list(str)
1248
1249
    :param build: Build to check, 3 letters + 3 numbers.
1250 5
    :type build: str
1251
1252
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
1253
    :type method: str
1254 5
    """
1255 5
    carrier_variants = {
1256 5
        "Priv": ("common", "vzw-vzw", "na-tmo", "na-att"),
1257 5
        "KEYone": ("common", "usa-sprint", "global-att", "china-china")
1258 5
    }
1259 5
    common_variants = ("common", )  # for single-variant devices
1260 5
    carrier_devices = ("Priv", )  # add KEYone when verified
1261 5
    skels = []
1262 5
    for dev in devs:
1263
        varlist = carrier_variants[dev] if dev in carrier_devices else common_variants
1264
        for var in varlist:
1265 5
            skel = make_droid_skeleton(method, build, dev, var)
1266
            skels.append(skel)
1267
    return skels
1268
1269
1270
def prepare_droid_list(device):
1271
    """
1272 5
    Convert single devices to a list, if necessary.
1273 5
1274
    :param device: Device to check.
1275 5
    :type device: str
1276 5
    """
1277
    if isinstance(device, list):
1278
        devs = device
1279 5
    else:
1280
        devs = [device]
1281
    return devs
1282
1283
1284
def droid_scanner(build, device, method=None, session=None):
1285
    """
1286
    Check for Android autoloaders on BlackBerry's site.
1287
1288
    :param build: Build to check, 3 letters + 3 numbers.
1289
    :type build: str
1290
1291
    :param device: Device to check.
1292
    :type device: str
1293
1294
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
1295 5
    :type method: str
1296 5
1297 5
    :param session: Requests session object, default is created on the fly.
1298 5
    :type session: requests.Session()
1299 5
    """
1300
    devs = prepare_droid_list(device)
1301
    skels = bulk_droid_skeletons(devs, build, method)
1302 5
    with concurrent.futures.ThreadPoolExecutor(max_workers=len(skels)) as xec:
1303
        results = droid_scanner_worker(xec, skels, session)
1304
    return results if results else None
1305
1306
1307
def droid_scanner_worker(xec, skels, session=None):
1308
    """
1309
    Worker to check for Android autoloaders.
1310
1311
    :param xec: ThreadPoolExecutor instance.
1312
    :type xec: concurrent.futures.ThreadPoolExecutor
1313
1314
    :param skels: List of skeleton formats.
1315 5
    :type skels: list(str)
1316 5
1317 5
    :param session: Requests session object, default is created on the fly.
1318 5
    :type session: requests.Session()
1319 5
    """
1320 5
    results = []
1321
    for skel in skels:
1322
        avail = xec.submit(availability, skel, session)
1323 5
        if avail.result():
1324
            results.append(skel)
1325
    return results
1326
1327
1328
def chunker(iterable, inc):
1329
    """
1330
    Convert an iterable into a list of inc sized lists.
1331
1332
    :param iterable: Iterable to chunk.
1333 5
    :type iterable: list/tuple/string
1334 5
1335
    :param inc: Increment; how big each chunk is.
1336
    :type inc: int
1337 5
    """
1338
    chunks = [iterable[x:x+inc] for x in range(0, len(iterable), inc)]
1339
    return chunks
1340
1341
1342
def unicode_filter(intext):
1343
    """
1344 5
    Remove Unicode crap.
1345
1346
    :param intext: Text to filter.
1347 5
    :type intext: str
1348
    """
1349
    return intext.replace("\u2013", "").strip()
1350
1351
1352
def table_header_filter(ptag):
1353
    """
1354 5
    Validate p tag, to see if it's relevant.
1355 5
1356
    :param ptag: P tag.
1357
    :type ptag: bs4.element.Tag
1358 5
    """
1359
    valid = ptag.find("b") and "BlackBerry" in ptag.text and not "experts" in ptag.text
1360
    return valid
1361
1362
1363
def table_headers(pees):
1364
    """
1365 5
    Generate table headers from list of p tags.
1366 5
1367
    :param pees: List of p tags.
1368
    :type pees: list(bs4.element.Tag)
1369 5
    """
1370 5
    bolds = [x.text for x in pees if table_header_filter(x)]
1371
    return bolds
1372
1373
1374
@pem_wrapper
1375
def loader_page_scraper(session=None):
1376
    """
1377 5
    Return scraped autoloader pages.
1378 5
1379 5
    :param session: Requests session object, default is created on the fly.
1380
    :type session: requests.Session()
1381
    """
1382 5
    session = generic_session(session)
1383
    loader_page_scraper_og(session)
1384
    loader_page_scraper_bbm(session)
1385
1386
1387
def loader_page_scraper_og(session=None):
1388
    """
1389 5
    Return scraped autoloader page, original site.
1390 5
1391 5
    :param session: Requests session object, default is created on the fly.
1392 5
    :type session: requests.Session()
1393 5
    """
1394 5
    url = "https://ca.blackberry.com/support/smartphones/Android-OS-Reload.html"
1395
    soup = generic_soup_parser(url, session)
1396
    tables = soup.find_all("table")
1397 5
    headers = table_headers(soup.find_all("p"))
1398
    for idx, table in enumerate(tables):
1399
        loader_page_chunker_og(idx, table, headers)
1400
1401
1402
def loader_page_scraper_bbm(session=None):
1403
    """
1404 5
    Return scraped autoloader page, new site.
1405 5
1406 5
    :param session: Requests session object, default is created on the fly.
1407 5
    :type session: requests.Session()
1408 5
    """
1409 5
    url = "https://www.blackberrymobile.com/support/reload-software/"
1410
    soup = generic_soup_parser(url, session)
1411
    ulls = soup.find_all("ul", {"class": re.compile("list-two special-.")})[1:]
1412 5
    print("~~~BlackBerry KEYone~~~")
1413
    for ull in ulls:
1414
        loader_page_chunker_bbm(ull)
1415
1416
1417
def loader_page_chunker_og(idx, table, headers):
1418
    """
1419
    Given a loader page table, chunk it into lists of table cells.
1420
1421
    :param idx: Index of enumerating tables.
1422
    :type idx: int
1423
1424
    :param table: HTML table tag.
1425 5
    :type table: bs4.element.Tag
1426 5
1427 5
    :param headers: List of table headers.
1428 5
    :type headers: list(str)
1429 5
    """
1430
    print("~~~{0}~~~".format(headers[idx]))
1431
    chunks = chunker(table.find_all("td"), 4)
1432 5
    for chunk in chunks:
1433
        loader_page_printer(chunk)
1434
    print(" ")
1435
1436
1437
def loader_page_chunker_bbm(ull):
1438
    """
1439 5
    Given a loader page list, chunk it into lists of list items.
1440 5
1441 5
    :param ull: HTML unordered list tag.
1442
    :type ull: bs4.element.Tag
1443
    """
1444 5
    chunks = chunker(ull.find_all("li"), 3)
1445
    for chunk in chunks:
1446
        loader_page_printer(chunk)
1447
1448
1449
def loader_page_printer(chunk):
1450
    """
1451 5
    Print individual cell texts given a list of table cells.
1452 5
1453 5
    :param chunk: List of td tags.
1454 5
    :type chunk: list(bs4.element.Tag)
1455
    """
1456
    key = unicode_filter(chunk[0].text)
1457 5
    ver = unicode_filter(chunk[1].text)
1458 5
    link = unicode_filter(chunk[2].find("a")["href"])
1459
    print("{0}\n    {1}: {2}".format(key, ver, link))
1460
1461
1462
@pem_wrapper
1463
def base_metadata(url, session=None):
1464
    """
1465
    Get BBNDK metadata, base function.
1466
1467
    :param url: URL to check.
1468 5
    :type url: str
1469 5
1470 5
    :param session: Requests session object, default is created on the fly.
1471 5
    :type session: requests.Session()
1472 5
    """
1473 5
    session = generic_session(session)
1474
    req = session.get(url)
1475
    data = req.content
1476 5
    entries = data.split(b"\n")
1477
    metadata = [entry.split(b",")[1].decode("utf-8") for entry in entries if entry]
1478
    return metadata
1479
1480
1481
def base_metadata_url(alternate=None):
1482
    """
1483 5
    Return metadata URL.
1484 5
1485 5
    :param alternate: If the URL is for the simulator metadata. Default is False.
1486
    :type alternate: str
1487
    """
1488 5
    baseurl = "http://downloads.blackberry.com/upr/developers/update/bbndk"
1489
    tail = "{0}/{0}_metadata".format(alternate) if alternate is not None else "metadata"
1490
    return "{0}/{1}".format(baseurl, tail)
1491
1492
1493
def ndk_metadata(session=None):
1494
    """
1495 5
    Get BBNDK target metadata.
1496 5
1497
    :param session: Requests session object, default is created on the fly.
1498
    :type session: requests.Session()
1499 5
    """
1500
    ndkurl = base_metadata_url()
1501
    data = base_metadata(ndkurl, session)
1502
    metadata = [entry for entry in data if entry.startswith(("10.0", "10.1", "10.2"))]
1503
    return metadata
1504
1505
1506 5
def sim_metadata(session=None):
1507 5
    """
1508
    Get BBNDK simulator metadata.
1509
1510 5
    :param session: Requests session object, default is created on the fly.
1511
    :type session: requests.Session()
1512
    """
1513
    simurl = base_metadata_url("simulator")
1514
    metadata = base_metadata(simurl, session)
1515
    return metadata
1516
1517 5
1518 5
def runtime_metadata(session=None):
1519
    """
1520
    Get BBNDK runtime metadata.
1521 5
1522 5
    :param session: Requests session object, default is created on the fly.
1523
    :type session: requests.Session()
1524
    """
1525
    rturl = base_metadata_url("runtime")
1526
    metadata = base_metadata(rturl, session)
1527
    return metadata
1528
1529
1530
def series_generator(osversion):
1531
    """
1532
    Generate series/branch name from OS version.
1533
1534
    :param osversion: OS version.
1535 5
    :type osversion: str
1536 5
    """
1537 5
    splits = osversion.split(".")
1538 5
    return "BB{0}_{1}_{2}".format(*splits[0:3])
1539 5
1540
1541 5
@pem_wrapper
1542 5
def devalpha_urls(osversion, skel, session=None):
1543
    """
1544
    Check individual Dev Alpha autoloader URLs.
1545 5
1546
    :param osversion: OS version.
1547
    :type osversion: str
1548
1549
    :param skel: Individual skeleton format to try.
1550
    :type skel: str
1551
1552
    :param session: Requests session object, default is created on the fly.
1553
    :type session: requests.Session()
1554
    """
1555 5
    session = generic_session(session)
1556 5
    baseurl = "http://downloads.blackberry.com/upr/developers/downloads"
1557 5
    url = "{2}/{0}{1}.exe".format(skel, osversion, baseurl)
1558 5
    req = session.head(url)
1559 5
    if req.status_code == 200:
1560
        finals = (url, req.headers["content-length"])
1561
    else:
1562 5
        finals = ()
1563
    return finals
1564
1565
1566
def devalpha_urls_serieshandler(osversion, skeletons):
1567
    """
1568
    Process list of candidate Dev Alpha autoloader URLs.
1569
1570
    :param osversion: OS version.
1571
    :type osversion: str
1572
1573
    :param skeletons: List of skeleton formats to try.
1574
    :type skeletons: list
1575
    """
1576
    skels = skeletons
1577
    for idx, skel in enumerate(skeletons):
1578 5
        if "<SERIES>" in skel:
1579 5
            skels[idx] = skel.replace("<SERIES>", series_generator(osversion))
1580 5
    return skels
1581 5
1582 5
1583 5
def devalpha_urls_bulk(osversion, skeletons, xec, session=None):
1584 5
    """
1585
    Construct list of valid Dev Alpha autoloader URLs.
1586
1587 5
    :param osversion: OS version.
1588
    :type osversion: str
1589
1590
    :param skeletons: List of skeleton formats to try.
1591
    :type skeletons: list
1592
1593
    :param xec: ThreadPoolExecutor instance.
1594
    :type xec: concurrent.futures.ThreadPoolExecutor
1595
1596
    :param session: Requests session object, default is created on the fly.
1597
    :type session: requests.Session()
1598
    """
1599
    finals = {}
1600 5
    skels = devalpha_urls_serieshandler(osversion, skeletons)
1601 5
    for skel in skels:
1602 5
        final = xec.submit(devalpha_urls, osversion, skel, session).result()
1603 5
        if final:
1604 5
            finals[final[0]] = final[1]
1605
    return finals
1606
1607 5
1608
def devalpha_urls_bootstrap(osversion, skeletons, session=None):
1609
    """
1610
    Get list of valid Dev Alpha autoloader URLs.
1611
1612
    :param osversion: OS version.
1613
    :type osversion: str
1614 5
1615 5
    :param skeletons: List of skeleton formats to try.
1616 5
    :type skeletons: list
1617 5
1618
    :param session: Requests session object, default is created on the fly.
1619
    :type session: requests.Session()
1620 5
    """
1621
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as xec:
1622
        try:
1623
            return devalpha_urls_bulk(osversion, skeletons, xec, session)
1624
        except KeyboardInterrupt:
1625
            xec.shutdown(wait=False)
1626
1627
1628
def dev_dupe_dicter(finals):
1629
    """
1630 5
    Prepare dictionary to clean duplicate autoloaders.
1631 5
1632 5
    :param finals: Dict of URL:content-length pairs.
1633 5
    :type finals: dict(str: str)
1634 5
    """
1635
    revo = {}
1636
    for key, val in finals.items():
1637 5
        revo.setdefault(val, set()).add(key)
1638
    return revo
1639
1640
1641
def dev_dupe_remover(finals, dupelist):
1642
    """
1643
    Filter dictionary of autoloader entries.
1644 5
1645 5
    :param finals: Dict of URL:content-length pairs.
1646 5
    :type finals: dict(str: str)
1647 5
1648
    :param dupelist: List of duplicate URLs.
1649
    :type duplist: list(str)
1650
    """
1651
    for dupe in dupelist:
1652
        for entry in dupe:
1653
            if "DevAlpha" in entry:
1654
                del finals[entry]
1655
    return finals
1656
1657
1658
def dev_dupe_cleaner(finals):
1659
    """
1660
    Clean duplicate autoloader entries.
1661
1662
    :param finals: Dict of URL:content-length pairs.
1663
    :type finals: dict(str: str)
1664
    """
1665
    revo = dev_dupe_dicter(finals)
1666
    dupelist = [val for key, val in revo.items() if len(val) > 1]
1667
    finals = dev_dupe_remover(finals, dupelist)
1668
    return finals
1669