Total Complexity | 192 |
Total Lines | 1669 |
Duplicated Lines | 0 % |
Coverage | 100% |
Changes | 0 |
Complex classes like bbarchivist.networkutils often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | #!/usr/bin/env python3 |
||
|
|||
2 | 5 | """This module is used for network connections; APIs, downloading, etc.""" |
|
3 | |||
4 | 5 | import base64 # encoding |
|
5 | 5 | import binascii # encoding |
|
6 | 5 | import concurrent.futures # multiprocessing/threading |
|
7 | 5 | import glob # pem file lookup |
|
8 | 5 | import hashlib # salt |
|
9 | 5 | import os # filesystem read |
|
10 | 5 | import random # salt |
|
11 | 5 | import re # regexes |
|
12 | 5 | import time # salt |
|
13 | 5 | import zlib # encoding |
|
14 | |||
15 | 5 | import requests # downloading |
|
16 | 5 | from bs4 import BeautifulSoup # scraping |
|
17 | 5 | from bbarchivist import utilities # parse filesize |
|
18 | 5 | from bbarchivist.bbconstants import SERVERS, TCLMASTERS # lookup servers |
|
19 | |||
20 | 5 | try: |
|
21 | 5 | from defusedxml import ElementTree # safer XML parsing |
|
22 | 1 | except (ImportError, AttributeError): |
|
23 | 1 | from xml.etree import ElementTree # XML parsing |
|
24 | |||
25 | 5 | __author__ = "Thurask" |
|
26 | 5 | __license__ = "WTFPL v2" |
|
27 | 5 | __copyright__ = "2015-2018 Thurask" |
|
28 | |||
29 | |||
30 | 5 | def grab_pem(): |
|
31 | """ |
||
32 | Work with either local cacerts or system cacerts. |
||
33 | """ |
||
34 | 5 | try: |
|
35 | 5 | pemfile = glob.glob(os.path.join(os.getcwd(), "cacert.pem"))[0] |
|
36 | 5 | except IndexError: |
|
37 | 5 | return requests.certs.where() # no local cacerts |
|
38 | else: |
||
39 | 5 | return os.path.abspath(pemfile) # local cacerts |
|
40 | |||
41 | |||
42 | 5 | def pem_wrapper(method): |
|
43 | """ |
||
44 | Decorator to set REQUESTS_CA_BUNDLE. |
||
45 | |||
46 | :param method: Method to use. |
||
47 | :type method: function |
||
48 | """ |
||
49 | 5 | def wrapper(*args, **kwargs): |
|
50 | """ |
||
51 | Set REQUESTS_CA_BUNDLE before doing function. |
||
52 | """ |
||
53 | 5 | os.environ["REQUESTS_CA_BUNDLE"] = grab_pem() |
|
54 | 5 | return method(*args, **kwargs) |
|
55 | 5 | return wrapper |
|
56 | |||
57 | |||
58 | 5 | def try_try_again(method): |
|
59 | """ |
||
60 | Decorator to absorb timeouts, proxy errors, and other common exceptions. |
||
61 | |||
62 | :param method: Method to use. |
||
63 | :type method: function |
||
64 | """ |
||
65 | 5 | def wrapper(*args, **kwargs): |
|
66 | """ |
||
67 | Try function, try it again up to five times, and leave gracefully. |
||
68 | """ |
||
69 | 5 | tries = 5 |
|
70 | 5 | for _ in range(tries): |
|
71 | 5 | try: |
|
72 | 5 | result = method(*args, **kwargs) |
|
73 | 5 | except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, requests.exceptions.ProxyError): |
|
74 | 5 | continue |
|
75 | else: |
||
76 | 5 | break |
|
77 | else: |
||
78 | 5 | result = None |
|
79 | 5 | return result |
|
80 | 5 | return wrapper |
|
81 | |||
82 | |||
83 | 5 | def generic_session(session=None): |
|
84 | """ |
||
85 | Create a Requests session object on the fly, if need be. |
||
86 | |||
87 | :param session: Requests session object, created if this is None. |
||
88 | :type session: requests.Session() |
||
89 | """ |
||
90 | 5 | sess = requests.Session() if session is None else session |
|
91 | 5 | return sess |
|
92 | |||
93 | |||
94 | 5 | def generic_soup_parser(url, session=None): |
|
95 | """ |
||
96 | Get a BeautifulSoup HTML parser for some URL. |
||
97 | |||
98 | :param url: The URL to check. |
||
99 | :type url: str |
||
100 | |||
101 | :param session: Requests session object, default is created on the fly. |
||
102 | :type session: requests.Session() |
||
103 | """ |
||
104 | 5 | session = generic_session(session) |
|
105 | 5 | req = session.get(url) |
|
106 | 5 | soup = BeautifulSoup(req.content, "html.parser") |
|
107 | 5 | return soup |
|
108 | |||
109 | |||
110 | 5 | @pem_wrapper |
|
111 | 5 | def get_length(url, session=None): |
|
112 | """ |
||
113 | Get content-length header from some URL. |
||
114 | |||
115 | :param url: The URL to check. |
||
116 | :type url: str |
||
117 | |||
118 | :param session: Requests session object, default is created on the fly. |
||
119 | :type session: requests.Session() |
||
120 | """ |
||
121 | 5 | session = generic_session(session) |
|
122 | 5 | if url is None: |
|
123 | 5 | return 0 |
|
124 | 5 | try: |
|
125 | 5 | heads = session.head(url) |
|
126 | 5 | fsize = heads.headers['content-length'] |
|
127 | 5 | return int(fsize) |
|
128 | 5 | except requests.ConnectionError: |
|
129 | 5 | return 0 |
|
130 | |||
131 | |||
132 | 5 | @pem_wrapper |
|
133 | 5 | def download(url, output_directory=None, session=None): |
|
134 | """ |
||
135 | Download file from given URL. |
||
136 | |||
137 | :param url: URL to download from. |
||
138 | :type url: str |
||
139 | |||
140 | :param output_directory: Download folder. Default is local. |
||
141 | :type output_directory: str |
||
142 | |||
143 | :param session: Requests session object, default is created on the fly. |
||
144 | :type session: requests.Session() |
||
145 | """ |
||
146 | 5 | session = generic_session(session) |
|
147 | 5 | output_directory = utilities.dirhandler(output_directory, os.getcwd()) |
|
148 | 5 | lfname = url.split('/')[-1] |
|
149 | 5 | sname = utilities.stripper(lfname) |
|
150 | 5 | fname = os.path.join(output_directory, lfname) |
|
151 | 5 | download_writer(url, fname, lfname, sname, session) |
|
152 | 5 | remove_empty_download(fname) |
|
153 | |||
154 | |||
155 | 5 | def remove_empty_download(fname): |
|
156 | """ |
||
157 | Remove file if it's empty. |
||
158 | |||
159 | :param fname: File path. |
||
160 | :type fname: str |
||
161 | """ |
||
162 | 5 | if os.stat(fname).st_size == 0: |
|
163 | 5 | os.remove(fname) |
|
164 | |||
165 | |||
166 | 5 | def download_writer(url, fname, lfname, sname, session=None): |
|
167 | """ |
||
168 | Download file and write to disk. |
||
169 | |||
170 | :param url: URL to download from. |
||
171 | :type url: str |
||
172 | |||
173 | :param fname: File path. |
||
174 | :type fname: str |
||
175 | |||
176 | :param lfname: Long filename. |
||
177 | :type lfname: str |
||
178 | |||
179 | :param sname: Short name, for printing to screen. |
||
180 | :type sname: str |
||
181 | |||
182 | :param session: Requests session object, default is created on the fly. |
||
183 | :type session: requests.Session() |
||
184 | """ |
||
185 | 5 | with open(fname, "wb") as file: |
|
186 | 5 | req = session.get(url, stream=True) |
|
187 | 5 | clength = req.headers['content-length'] |
|
188 | 5 | fsize = utilities.fsizer(clength) |
|
189 | 5 | if req.status_code == 200: # 200 OK |
|
190 | 5 | print("DOWNLOADING {0} [{1}]".format(sname, fsize)) |
|
191 | 5 | for chunk in req.iter_content(chunk_size=1024): |
|
192 | 5 | file.write(chunk) |
|
193 | else: |
||
194 | 5 | print("ERROR: HTTP {0} IN {1}".format(req.status_code, lfname)) |
|
195 | |||
196 | |||
197 | 5 | def download_bootstrap(urls, outdir=None, workers=5, session=None): |
|
198 | """ |
||
199 | Run downloaders for each file in given URL iterable. |
||
200 | |||
201 | :param urls: URLs to download. |
||
202 | :type urls: list |
||
203 | |||
204 | :param outdir: Download folder. Default is handled in :func:`download`. |
||
205 | :type outdir: str |
||
206 | |||
207 | :param workers: Number of worker processes. Default is 5. |
||
208 | :type workers: int |
||
209 | |||
210 | :param session: Requests session object, default is created on the fly. |
||
211 | :type session: requests.Session() |
||
212 | """ |
||
213 | 5 | workers = len(urls) if len(urls) < workers else workers |
|
214 | 5 | spinman = utilities.SpinManager() |
|
215 | 5 | with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as xec: |
|
216 | 5 | try: |
|
217 | 5 | spinman.start() |
|
218 | 5 | for url in urls: |
|
219 | 5 | xec.submit(download, url, outdir, session) |
|
220 | 5 | except (KeyboardInterrupt, SystemExit): |
|
221 | 5 | xec.shutdown() |
|
222 | 5 | spinman.stop() |
|
223 | 5 | spinman.stop() |
|
224 | 5 | utilities.spinner_clear() |
|
225 | 5 | utilities.line_begin() |
|
226 | |||
227 | |||
228 | 5 | def download_android_tools(downloaddir=None): |
|
229 | """ |
||
230 | Download Android SDK platform tools. |
||
231 | |||
232 | :param downloaddir: Directory name, default is "plattools". |
||
233 | :type downloaddir: str |
||
234 | """ |
||
235 | 5 | if downloaddir is None: |
|
236 | 5 | downloaddir = "plattools" |
|
237 | 5 | if os.path.exists(downloaddir): |
|
238 | 5 | os.removedirs(downloaddir) |
|
239 | 5 | os.mkdir(downloaddir) |
|
240 | 5 | platforms = ("windows", "linux", "darwin") |
|
241 | 5 | baseurl = "https://dl.google.com/android/repository/platform-tools-latest" |
|
242 | 5 | dlurls = ["{1}-{0}.zip".format(plat, baseurl) for plat in platforms] |
|
243 | 5 | sess = generic_session() |
|
244 | download_bootstrap(dlurls, outdir="plattools", session=sess) |
||
245 | |||
246 | 5 | ||
247 | 5 | @pem_wrapper |
|
248 | def getcode(url, session=None): |
||
249 | """ |
||
250 | Return status code of given URL. |
||
251 | |||
252 | :param url: URL to check. |
||
253 | :type url: str |
||
254 | |||
255 | :param session: Requests session object, default is created on the fly. |
||
256 | :type session: requests.Session() |
||
257 | 5 | """ |
|
258 | 5 | session = generic_session(session) |
|
259 | 5 | try: |
|
260 | 5 | shead = session.head(url) |
|
261 | 5 | status = int(shead.status_code) |
|
262 | 5 | return status |
|
263 | 5 | except requests.ConnectionError: |
|
264 | return 404 |
||
265 | |||
266 | 5 | ||
267 | 5 | @pem_wrapper |
|
268 | def availability(url, session=None): |
||
269 | """ |
||
270 | Check HTTP status code of given URL. |
||
271 | 200 or 301-308 is OK, else is not. |
||
272 | |||
273 | :param url: URL to check. |
||
274 | :type url: str |
||
275 | |||
276 | :param session: Requests session object, default is created on the fly. |
||
277 | :type session: requests.Session() |
||
278 | 5 | """ |
|
279 | 5 | status = getcode(url, session) |
|
280 | return status == 200 or 300 < status <= 308 |
||
281 | |||
282 | 5 | ||
283 | def clean_availability(results, server): |
||
284 | """ |
||
285 | Clean availability for autolookup script. |
||
286 | |||
287 | :param results: Result dict. |
||
288 | :type results: dict(str: str) |
||
289 | |||
290 | :param server: Server, key for result dict. |
||
291 | :type server: str |
||
292 | 5 | """ |
|
293 | 5 | marker = "PD" if server == "p" else server.upper() |
|
294 | 5 | rel = results[server.lower()] |
|
295 | 5 | avail = marker if rel != "SR not in system" and rel is not None else " " |
|
296 | return rel, avail |
||
297 | |||
298 | 5 | ||
299 | def tcl_master(): |
||
300 | """ |
||
301 | Get a random master server. |
||
302 | 5 | """ |
|
303 | return random.choice(TCLMASTERS) |
||
304 | |||
305 | 5 | ||
306 | def tcl_default_id(devid): |
||
307 | """ |
||
308 | Get an IMEI or a serial number or something. |
||
309 | |||
310 | :param devid: Return default if this is None. |
||
311 | :type devid: str |
||
312 | 5 | """ |
|
313 | 5 | if devid is None: |
|
314 | 5 | devid = "543212345000000" |
|
315 | return devid |
||
316 | |||
317 | 5 | ||
318 | def check_prep(curef, mode=4, fvver="AAA000", cltp=2010, cktp=2, rtd=1, chnl=2, devid=None): |
||
1 ignored issue
–
show
|
|||
319 | """ |
||
320 | Prepare variables for TCL update check. |
||
321 | |||
322 | :param curef: PRD of the phone variant to check. |
||
323 | :type curef: str |
||
324 | |||
325 | :param mode: 4 if downloading autoloaders, 2 if downloading OTA deltas. |
||
326 | :type mode: int |
||
327 | |||
328 | :param fvver: Initial software version, must be specific if downloading OTA deltas. |
||
329 | :type fvver: str |
||
330 | |||
331 | :param cltp: 2010 to always show latest version, 10 to show actual updates. Default is 2010. |
||
332 | :type cltp: int |
||
333 | |||
334 | :param cktp: 2 if checking manually, 1 if checking automatically. Default is 2. |
||
335 | :type cktp: int |
||
336 | |||
337 | :param rtd: 2 if rooted, 1 if not. Default is 1. |
||
338 | :type rtd: int |
||
339 | |||
340 | :param chnl: 2 if checking on WiFi, 1 if checking on mobile. Default is 2. |
||
341 | :type chnl: int |
||
342 | |||
343 | :param devid: Serial number/IMEI. Default is fake, not that it matters. |
||
344 | :type devid: str |
||
345 | 5 | """ |
|
346 | 5 | devid = tcl_default_id(devid) |
|
347 | 5 | geturl = "http://{0}/check.php".format(tcl_master()) |
|
348 | 5 | params = {"id": devid, "curef": curef, "fv": fvver, "mode": mode, "type": "Firmware", "cltp": cltp, "cktp": cktp, "rtd": rtd, "chnl": chnl} |
|
349 | return geturl, params |
||
350 | |||
351 | 5 | ||
352 | 5 | @pem_wrapper |
|
353 | 5 | @try_try_again |
|
354 | def tcl_check(curef, session=None, mode=4, fvver="AAA000", export=False): |
||
355 | """ |
||
356 | Check TCL server for updates. |
||
357 | |||
358 | :param curef: PRD of the phone variant to check. |
||
359 | :type curef: str |
||
360 | |||
361 | :param session: Requests session object, default is created on the fly. |
||
362 | :type session: requests.Session() |
||
363 | |||
364 | :param mode: 4 if downloading autoloaders, 2 if downloading OTA deltas. |
||
365 | :type mode: int |
||
366 | |||
367 | :param fvver: Initial software version, must be specific if downloading OTA deltas. |
||
368 | :type fvver: str |
||
369 | |||
370 | :param export: Whether to export XML response to file. Default is False. |
||
371 | :type export: bool |
||
372 | 5 | """ |
|
373 | 5 | sess = generic_session(session) |
|
374 | 5 | geturl, params = check_prep(curef, mode, fvver) |
|
375 | 5 | req = sess.get(geturl, params=params) |
|
376 | 5 | if req.status_code == 200: |
|
377 | 5 | req.encoding = "utf-8" |
|
378 | 5 | response = req.text |
|
379 | 5 | if export: |
|
380 | dump_tcl_xml(response) |
||
381 | 5 | else: |
|
382 | 5 | response = None |
|
383 | return response |
||
384 | |||
385 | 5 | ||
386 | def parse_tcl_check(data): |
||
387 | """ |
||
388 | Extract version and file info from TCL update server response. |
||
389 | |||
390 | :param data: The data to parse. |
||
391 | :type data: str |
||
392 | 5 | """ |
|
393 | 5 | root = ElementTree.fromstring(data) |
|
394 | 5 | tvver = root.find("VERSION").find("TV").text |
|
395 | 5 | fwid = root.find("FIRMWARE").find("FW_ID").text |
|
396 | 5 | fileinfo = root.find("FIRMWARE").find("FILESET").find("FILE") |
|
397 | 5 | filename = fileinfo.find("FILENAME").text |
|
398 | 5 | filesize = fileinfo.find("SIZE").text |
|
399 | 5 | filehash = fileinfo.find("CHECKSUM").text |
|
400 | return tvver, fwid, filename, filesize, filehash |
||
401 | |||
402 | 5 | ||
403 | def tcl_salt(): |
||
404 | """ |
||
405 | Generate salt value for TCL server tools. |
||
406 | 5 | """ |
|
407 | 5 | millis = round(time.time() * 1000) |
|
408 | 5 | tail = "{0:06d}".format(random.randint(0, 999999)) |
|
409 | return "{0}{1}".format(str(millis), tail) |
||
410 | |||
411 | 5 | ||
412 | def dump_tcl_xml(xmldata): |
||
413 | """ |
||
414 | Write XML responses to output directory. |
||
415 | |||
416 | :param xmldata: Response XML. |
||
417 | :type xmldata: str |
||
418 | 5 | """ |
|
419 | 5 | outfile = os.path.join(os.getcwd(), "logs", "{0}.xml".format(tcl_salt())) |
|
420 | 5 | if not os.path.exists(os.path.dirname(outfile)): |
|
421 | 5 | os.makedirs(os.path.dirname(outfile)) |
|
422 | 5 | with open(outfile, "w", encoding="utf-8") as afile: |
|
423 | afile.write(xmldata) |
||
424 | |||
425 | 5 | ||
426 | def unpack_vdkey(): |
||
427 | """ |
||
428 | Draw the curtain back. |
||
429 | 5 | """ |
|
430 | 5 | vdkey = b"eJwdjwEOwDAIAr8kKFr//7HhmqXp8AIIDrYAgg8byiUXrwRJRXja+d6iNxu0AhUooDCN9rd6rDLxmGIakUVWo3IGCTRWqCAt6X4jGEIUAxgN0eYWnp+LkpHQAg/PsO90ELsy0Npm/n2HbtPndFgGEV31R9OmT4O4nrddjc3Qt6nWscx7e+WRHq5UnOudtjw5skuV09pFhvmqnOEIs4ljPeel1wfLYUF4\n" |
|
431 | 5 | vdk = zlib.decompress(binascii.a2b_base64(vdkey)) |
|
432 | return vdk.decode("utf-8") |
||
433 | |||
434 | 5 | ||
435 | def vkhash(curef, tvver, fwid, salt, mode=4, fvver="AAA000", cltp=2010, devid=None): |
||
1 ignored issue
–
show
|
|||
436 | """ |
||
437 | Generate hash from TCL update server variables. |
||
438 | |||
439 | :param curef: PRD of the phone variant to check. |
||
440 | :type curef: str |
||
441 | |||
442 | :param tvver: Target software version. |
||
443 | :type tvver: str |
||
444 | |||
445 | :param fwid: Firmware ID for desired download file. |
||
446 | :type fwid: str |
||
447 | |||
448 | :param salt: Salt hash. |
||
449 | :type salt: str |
||
450 | |||
451 | :param mode: 4 if downloading autoloaders, 2 if downloading OTA deltas. |
||
452 | :type mode: int |
||
453 | |||
454 | :param fvver: Initial software version, must be specific if downloading OTA deltas. |
||
455 | :type fvver: str |
||
456 | |||
457 | :param cltp: 2010 to always show latest version, 10 to show actual updates. Default is 2010. |
||
458 | :type cltp: int |
||
459 | |||
460 | :param devid: Serial number/IMEI. Default is fake, not that it matters. |
||
461 | :type devid: str |
||
462 | 5 | """ |
|
463 | 5 | vdk = unpack_vdkey() |
|
464 | 5 | devid = tcl_default_id(devid) |
|
465 | 5 | query = "id={0}&salt={1}&curef={2}&fv={3}&tv={4}&type={5}&fw_id={6}&mode={7}&cltp={8}{9}".format(devid, salt, curef, fvver, tvver, "Firmware", fwid, mode, cltp, vdk) |
|
466 | 5 | engine = hashlib.sha1() |
|
467 | 5 | engine.update(bytes(query, "utf-8")) |
|
468 | return engine.hexdigest() |
||
469 | |||
470 | 5 | ||
471 | def download_request_prep(curef, tvver, fwid, salt, vkh, mode=4, fvver="AAA000", cltp=2010, devid=None): |
||
1 ignored issue
–
show
|
|||
472 | """ |
||
473 | Prepare variables for download server check. |
||
474 | |||
475 | :param curef: PRD of the phone variant to check. |
||
476 | :type curef: str |
||
477 | |||
478 | :param tvver: Target software version. |
||
479 | :type tvver: str |
||
480 | |||
481 | :param fwid: Firmware ID for desired download file. |
||
482 | :type fwid: str |
||
483 | |||
484 | :param salt: Salt hash. |
||
485 | :type salt: str |
||
486 | |||
487 | :param vkh: VDKey-based hash. |
||
488 | :type vkh: str |
||
489 | |||
490 | :param mode: 4 if downloading autoloaders, 2 if downloading OTA deltas. |
||
491 | :type mode: int |
||
492 | |||
493 | :param fvver: Initial software version, must be specific if downloading OTA deltas. |
||
494 | :type fvver: str |
||
495 | |||
496 | :param cltp: 2010 to always show latest version, 10 to show actual updates. Default is 2010. |
||
497 | :type cltp: int |
||
498 | |||
499 | :param devid: Serial number/IMEI. Default is fake, not that it matters. |
||
500 | :type devid: str |
||
501 | 5 | """ |
|
502 | 5 | devid = tcl_default_id(devid) |
|
503 | 5 | posturl = "http://{0}/download_request.php".format(tcl_master()) |
|
504 | 5 | params = {"id": devid, "curef": curef, "fv": fvver, "mode": mode, "type": "Firmware", "tv": tvver, "fw_id": fwid, "salt": salt, "vk": vkh, "cltp": cltp} |
|
505 | 5 | if mode == 4: |
|
506 | 5 | params["foot"] = 1 |
|
507 | return posturl, params |
||
508 | |||
509 | 5 | ||
510 | 5 | @pem_wrapper |
|
511 | 5 | @try_try_again |
|
512 | def tcl_download_request(curef, tvver, fwid, salt, vkh, session=None, mode=4, fvver="AAA000", export=False): |
||
1 ignored issue
–
show
|
|||
513 | """ |
||
514 | Check TCL server for download URLs. |
||
515 | |||
516 | :param curef: PRD of the phone variant to check. |
||
517 | :type curef: str |
||
518 | |||
519 | :param tvver: Target software version. |
||
520 | :type tvver: str |
||
521 | |||
522 | :param fwid: Firmware ID for desired download file. |
||
523 | :type fwid: str |
||
524 | |||
525 | :param salt: Salt hash. |
||
526 | :type salt: str |
||
527 | |||
528 | :param vkh: VDKey-based hash. |
||
529 | :type vkh: str |
||
530 | |||
531 | :param session: Requests session object, default is created on the fly. |
||
532 | :type session: requests.Session() |
||
533 | |||
534 | :param mode: 4 if downloading autoloaders, 2 if downloading OTA deltas. |
||
535 | :type mode: int |
||
536 | |||
537 | :param fvver: Initial software version, must be specific if downloading OTA deltas. |
||
538 | :type fvver: str |
||
539 | |||
540 | :param export: Whether to export XML response to file. Default is False. |
||
541 | :type export: bool |
||
542 | 5 | """ |
|
543 | 5 | sess = generic_session(session) |
|
544 | 5 | posturl, params = download_request_prep(curef, tvver, fwid, salt, vkh, mode, fvver) |
|
545 | 5 | req = sess.post(posturl, data=params) |
|
546 | 5 | if req.status_code == 200: |
|
547 | 5 | req.encoding = "utf-8" |
|
548 | 5 | response = req.text |
|
549 | 5 | if export: |
|
550 | dump_tcl_xml(response) |
||
551 | 5 | else: |
|
552 | 5 | response = None |
|
553 | return response |
||
554 | |||
555 | 5 | ||
556 | def parse_tcl_download_request(body, mode=4): |
||
557 | """ |
||
558 | Extract file URL and encrypt slave URL from TCL update server response. |
||
559 | |||
560 | :param data: The data to parse. |
||
561 | :type data: str |
||
562 | |||
563 | :param mode: 4 if downloading autoloaders, 2 if downloading OTA deltas. |
||
564 | :type mode: int |
||
565 | 5 | """ |
|
566 | 5 | root = ElementTree.fromstring(body) |
|
567 | 5 | slavelist = root.find("SLAVE_LIST").findall("SLAVE") |
|
568 | 5 | slave = random.choice(slavelist).text |
|
569 | 5 | dlurl = root.find("FILE_LIST").find("FILE").find("DOWNLOAD_URL").text |
|
570 | 5 | eslave = root.find("SLAVE_LIST").findall("ENCRYPT_SLAVE") |
|
571 | 5 | encslave = None if mode == 2 or not eslave else random.choice(eslave).text |
|
572 | return "http://{0}{1}".format(slave, dlurl), encslave |
||
573 | |||
574 | 5 | ||
575 | def encrypt_header_prep(address, encslave): |
||
576 | """ |
||
577 | Prepare variables for encrypted header check. |
||
578 | |||
579 | :param address: File URL minus host. |
||
580 | :type address: str |
||
581 | |||
582 | :param encslave: Server hosting header script. |
||
583 | :type encslave: str |
||
584 | 5 | """ |
|
585 | 5 | encs = {b"YWNjb3VudA==" : b"emhlbmdodWEuZ2Fv", b"cGFzc3dvcmQ=": b"cWFydUQ0b2s="} |
|
586 | 5 | params = {base64.b64decode(key): base64.b64decode(val) for key, val in encs.items()} |
|
587 | 5 | params[b"address"] = bytes(address, "utf-8") |
|
588 | 5 | posturl = "http://{0}/encrypt_header.php".format(encslave) |
|
589 | return posturl, params |
||
590 | |||
591 | 5 | ||
592 | 5 | @pem_wrapper |
|
593 | def encrypt_header(address, encslave, session=None): |
||
594 | """ |
||
595 | Check encrypted header. |
||
596 | |||
597 | :param address: File URL minus host. |
||
598 | :type address: str |
||
599 | |||
600 | :param encslave: Server hosting header script. |
||
601 | :type encslave: str |
||
602 | |||
603 | :param session: Requests session object, default is created on the fly. |
||
604 | :type session: requests.Session() |
||
605 | 5 | """ |
|
606 | 5 | sess = generic_session(session) |
|
607 | 5 | posturl, params = encrypt_header_prep(address, encslave) |
|
608 | 5 | req = sess.post(posturl, data=params) |
|
609 | 5 | if req.status_code == 206: # partial |
|
610 | 5 | contentlength = int(req.headers["Content-Length"]) |
|
611 | 5 | sentinel = "HEADER FOUND" if contentlength == 4194320 else "NO HEADER FOUND" |
|
612 | else: |
||
613 | 5 | sentinel = None |
|
614 | return sentinel |
||
615 | |||
616 | 5 | ||
617 | @pem_wrapper |
||
618 | def remote_prd_info(): |
||
619 | """ |
||
620 | Get list of remote OTA versions. |
||
621 | 5 | """ |
|
622 | 5 | dburl = "https://tclota.birth-online.de/json_lastupdates.php" |
|
623 | 5 | req = requests.get(dburl) |
|
624 | 5 | reqj = req.json() |
|
625 | 5 | otadict = {val["curef"]: val["last_ota"] for val in reqj.values() if val["last_ota"] is not None} |
|
626 | return otadict |
||
627 | |||
628 | 5 | ||
629 | def cchecker_get_tags(root): |
||
630 | """ |
||
631 | Get country and carrier from XML. |
||
632 | |||
633 | :param root: ElementTree we're barking up. |
||
634 | :type root: xml.etree.ElementTree.ElementTree |
||
635 | 5 | """ |
|
636 | 5 | for child in root: |
|
637 | 5 | if child.tag == "country": |
|
638 | 5 | country = child.get("name") |
|
639 | 5 | if child.tag == "carrier": |
|
640 | 5 | carrier = child.get("name") |
|
641 | return country, carrier |
||
2 ignored issues
–
show
|
|||
642 | |||
643 | 5 | ||
644 | 5 | @pem_wrapper |
|
645 | def carrier_checker(mcc, mnc, session=None): |
||
646 | """ |
||
647 | Query BlackBerry World to map a MCC and a MNC to a country and carrier. |
||
648 | |||
649 | :param mcc: Country code. |
||
650 | :type mcc: int |
||
651 | |||
652 | :param mnc: Network code. |
||
653 | :type mnc: int |
||
654 | |||
655 | :param session: Requests session object, default is created on the fly. |
||
656 | :type session: requests.Session() |
||
657 | 5 | """ |
|
658 | 5 | session = generic_session(session) |
|
659 | 5 | baseurl = "http://appworld.blackberry.com/ClientAPI/checkcarrier" |
|
660 | 5 | url = "{2}?homemcc={0}&homemnc={1}&devicevendorid=-1&pin=0".format(mcc, mnc, baseurl) |
|
661 | 5 | user_agent = {'User-agent': 'AppWorld/5.1.0.60'} |
|
662 | 5 | req = session.get(url, headers=user_agent) |
|
663 | 5 | root = ElementTree.fromstring(req.text) |
|
664 | country, carrier = cchecker_get_tags(root) |
||
665 | return country, carrier |
||
666 | 5 | ||
667 | |||
668 | def return_npc(mcc, mnc): |
||
669 | """ |
||
670 | Format MCC and MNC into a NPC. |
||
671 | |||
672 | :param mcc: Country code. |
||
673 | :type mcc: int |
||
674 | |||
675 | :param mnc: Network code. |
||
676 | 5 | :type mnc: int |
|
677 | """ |
||
678 | return "{0}{1}30".format(str(mcc).zfill(3), str(mnc).zfill(3)) |
||
679 | 5 | ||
680 | 5 | ||
681 | @pem_wrapper |
||
682 | def carrier_query(npc, device, upgrade=False, blitz=False, forced=None, session=None): |
||
1 ignored issue
–
show
|
|||
683 | """ |
||
684 | Query BlackBerry servers, check which update is out for a carrier. |
||
685 | |||
686 | :param npc: MCC + MNC (see `func:return_npc`) |
||
687 | :type npc: int |
||
688 | |||
689 | :param device: Hexadecimal hardware ID. |
||
690 | :type device: str |
||
691 | |||
692 | :param upgrade: Whether to use upgrade files. False by default. |
||
693 | :type upgrade: bool |
||
694 | |||
695 | :param blitz: Whether or not to create a blitz package. False by default. |
||
696 | :type blitz: bool |
||
697 | |||
698 | :param forced: Force a software release. |
||
699 | :type forced: str |
||
700 | |||
701 | :param session: Requests session object, default is created on the fly. |
||
702 | 5 | :type session: requests.Session() |
|
703 | 5 | """ |
|
704 | 5 | session = generic_session(session) |
|
705 | 5 | upg = "upgrade" if upgrade else "repair" |
|
706 | 5 | forced = "latest" if forced is None else forced |
|
707 | 5 | url = "https://cs.sl.blackberry.com/cse/updateDetails/2.2/" |
|
708 | 5 | query = '<?xml version="1.0" encoding="UTF-8"?>' |
|
709 | 5 | query += '<updateDetailRequest version="2.2.1" authEchoTS="1366644680359">' |
|
710 | 5 | query += "<clientProperties>" |
|
711 | 5 | query += "<hardware>" |
|
712 | 5 | query += "<pin>0x2FFFFFB3</pin><bsn>1128121361</bsn>" |
|
713 | 5 | query += "<imei>004401139269240</imei>" |
|
714 | 5 | query += "<id>0x{0}</id>".format(device) |
|
715 | 5 | query += "</hardware>" |
|
716 | 5 | query += "<network>" |
|
717 | 5 | query += "<homeNPC>0x{0}</homeNPC>".format(npc) |
|
718 | 5 | query += "<iccid>89014104255505565333</iccid>" |
|
719 | 5 | query += "</network>" |
|
720 | 5 | query += "<software>" |
|
721 | 5 | query += "<currentLocale>en_US</currentLocale>" |
|
722 | 5 | query += "<legalLocale>en_US</legalLocale>" |
|
723 | 5 | query += "</software>" |
|
724 | 5 | query += "</clientProperties>" |
|
725 | 5 | query += "<updateDirectives>" |
|
726 | 5 | query += '<allowPatching type="REDBEND">true</allowPatching>' |
|
727 | 5 | query += "<upgradeMode>{0}</upgradeMode>".format(upg) |
|
728 | 5 | query += "<provideDescriptions>false</provideDescriptions>" |
|
729 | 5 | query += "<provideFiles>true</provideFiles>" |
|
730 | 5 | query += "<queryType>NOTIFICATION_CHECK</queryType>" |
|
731 | 5 | query += "</updateDirectives>" |
|
732 | 5 | query += "<pollType>manual</pollType>" |
|
733 | 5 | query += "<resultPackageSetCriteria>" |
|
734 | 5 | query += '<softwareRelease softwareReleaseVersion="{0}" />'.format(forced) |
|
735 | 5 | query += "<releaseIndependent>" |
|
736 | 5 | query += '<packageType operation="include">application</packageType>' |
|
737 | 5 | query += "</releaseIndependent>" |
|
738 | 5 | query += "</resultPackageSetCriteria>" |
|
739 | 5 | query += "</updateDetailRequest>" |
|
740 | 5 | header = {"Content-Type": "text/xml;charset=UTF-8"} |
|
741 | req = session.post(url, headers=header, data=query) |
||
742 | return parse_carrier_xml(req.text, blitz) |
||
743 | 5 | ||
744 | |||
745 | def carrier_swver_get(root): |
||
746 | """ |
||
747 | Get software release from carrier XML. |
||
748 | |||
749 | :param root: ElementTree we're barking up. |
||
750 | 5 | :type root: xml.etree.ElementTree.ElementTree |
|
751 | 5 | """ |
|
752 | 5 | for child in root.iter("softwareReleaseMetadata"): |
|
753 | swver = child.get("softwareReleaseVersion") |
||
754 | return swver |
||
1 ignored issue
–
show
|
|||
755 | 5 | ||
756 | |||
757 | def carrier_child_fileappend(child, files, baseurl, blitz=False): |
||
758 | """ |
||
759 | Append bar file links to a list from a child element. |
||
760 | |||
761 | :param child: Child element in use. |
||
762 | :type child: xml.etree.ElementTree.Element |
||
763 | |||
764 | :param files: Filelist. |
||
765 | :type files: list(str) |
||
766 | |||
767 | :param baseurl: Base URL, URL minus the filename. |
||
768 | :type baseurl: str |
||
769 | |||
770 | :param blitz: Whether or not to create a blitz package. False by default. |
||
771 | 5 | :type blitz: bool |
|
772 | 5 | """ |
|
773 | if not blitz: |
||
774 | 5 | files.append(baseurl + child.get("path")) |
|
775 | 5 | else: |
|
776 | 5 | if child.get("type") not in ["system:radio", "system:desktop", "system:os"]: |
|
777 | files.append(baseurl + child.get("path")) |
||
778 | return files |
||
779 | 5 | ||
780 | |||
781 | def carrier_child_finder(root, files, baseurl, blitz=False): |
||
782 | """ |
||
783 | Extract filenames, radio and OS from child elements. |
||
784 | |||
785 | :param root: ElementTree we're barking up. |
||
786 | :type root: xml.etree.ElementTree.ElementTree |
||
787 | |||
788 | :param files: Filelist. |
||
789 | :type files: list(str) |
||
790 | |||
791 | :param baseurl: Base URL, URL minus the filename. |
||
792 | :type baseurl: str |
||
793 | |||
794 | :param blitz: Whether or not to create a blitz package. False by default. |
||
795 | 5 | :type blitz: bool |
|
796 | 5 | """ |
|
797 | 5 | osver = radver = "" |
|
798 | 5 | for child in root.iter("package"): |
|
799 | 5 | files = carrier_child_fileappend(child, files, baseurl, blitz) |
|
800 | 5 | if child.get("type") == "system:radio": |
|
801 | 5 | radver = child.get("version") |
|
802 | 5 | elif child.get("type") == "system:desktop": |
|
803 | 5 | osver = child.get("version") |
|
804 | 5 | elif child.get("type") == "system:os": |
|
805 | osver = child.get("version") |
||
806 | return osver, radver, files |
||
807 | 5 | ||
808 | |||
809 | def parse_carrier_xml(data, blitz=False): |
||
810 | """ |
||
811 | Parse the response to a carrier update request and return the juicy bits. |
||
812 | |||
813 | :param data: The data to parse. |
||
814 | :type data: xml |
||
815 | |||
816 | :param blitz: Whether or not to create a blitz package. False by default. |
||
817 | 5 | :type blitz: bool |
|
818 | 5 | """ |
|
819 | 5 | root = ElementTree.fromstring(data) |
|
820 | 5 | sw_exists = root.find('./data/content/softwareReleaseMetadata') |
|
821 | 5 | swver = "N/A" if sw_exists is None else "" |
|
822 | 5 | if sw_exists is not None: |
|
823 | 5 | swver = carrier_swver_get(root) |
|
824 | 5 | files = [] |
|
825 | 5 | package_exists = root.find('./data/content/fileSets/fileSet') |
|
826 | 5 | osver = radver = "" |
|
827 | 5 | if package_exists is not None: |
|
828 | 5 | baseurl = "{0}/".format(package_exists.get("url")) |
|
829 | osver, radver, files = carrier_child_finder(root, files, baseurl, blitz) |
||
830 | return(swver, osver, radver, files) |
||
831 | 5 | ||
832 | 5 | ||
833 | @pem_wrapper |
||
834 | def sr_lookup(osver, server, session=None): |
||
835 | """ |
||
836 | Software release lookup, with choice of server. |
||
837 | :data:`bbarchivist.bbconstants.SERVERLIST` for server list. |
||
838 | |||
839 | :param osver: OS version to lookup, 10.x.y.zzzz. |
||
840 | :type osver: str |
||
841 | |||
842 | :param server: Server to use. |
||
843 | :type server: str |
||
844 | |||
845 | :param session: Requests session object, default is created on the fly. |
||
846 | 5 | :type session: requests.Session() |
|
847 | 5 | """ |
|
848 | 5 | query = '<?xml version="1.0" encoding="UTF-8"?>' |
|
849 | 5 | query += '<srVersionLookupRequest version="2.0.0"' |
|
850 | 5 | query += ' authEchoTS="1366644680359">' |
|
851 | 5 | query += '<clientProperties><hardware>' |
|
852 | 5 | query += '<pin>0x2FFFFFB3</pin><bsn>1140011878</bsn>' |
|
853 | 5 | query += '<imei>004402242176786</imei><id>0x8D00240A</id>' |
|
854 | 5 | query += '<isBootROMSecure>true</isBootROMSecure>' |
|
855 | 5 | query += '</hardware>' |
|
856 | 5 | query += '<network>' |
|
857 | 5 | query += '<vendorId>0x0</vendorId><homeNPC>0x60</homeNPC>' |
|
858 | 5 | query += '<currentNPC>0x60</currentNPC><ecid>0x1</ecid>' |
|
859 | 5 | query += '</network>' |
|
860 | 5 | query += '<software><currentLocale>en_US</currentLocale>' |
|
861 | 5 | query += '<legalLocale>en_US</legalLocale>' |
|
862 | 5 | query += '<osVersion>{0}</osVersion>'.format(osver) |
|
863 | 5 | query += '<omadmEnabled>false</omadmEnabled>' |
|
864 | 5 | query += '</software></clientProperties>' |
|
865 | 5 | query += '</srVersionLookupRequest>' |
|
866 | 5 | reqtext = sr_lookup_poster(query, server, session) |
|
867 | packtext = sr_lookup_xmlparser(reqtext) |
||
868 | return packtext |
||
869 | 5 | ||
870 | |||
871 | def sr_lookup_poster(query, server, session=None): |
||
872 | """ |
||
873 | Post the XML payload for a software release lookup. |
||
874 | |||
875 | :param query: XML payload. |
||
876 | :type query: str |
||
877 | |||
878 | :param server: Server to use. |
||
879 | :type server: str |
||
880 | |||
881 | :param session: Requests session object, default is created on the fly. |
||
882 | 5 | :type session: requests.Session() |
|
883 | 5 | """ |
|
884 | 5 | session = generic_session(session) |
|
885 | 5 | header = {"Content-Type": "text/xml;charset=UTF-8"} |
|
886 | 5 | try: |
|
887 | 5 | req = session.post(server, headers=header, data=query, timeout=1) |
|
888 | except (requests.exceptions.Timeout, requests.exceptions.ConnectionError): |
||
889 | 5 | reqtext = "SR not in system" |
|
890 | 5 | else: |
|
891 | reqtext = req.text |
||
892 | return reqtext |
||
893 | 5 | ||
894 | |||
895 | def sr_lookup_xmlparser(reqtext): |
||
896 | """ |
||
897 | Take the text of a software lookup request response and parse it as XML. |
||
898 | |||
899 | :param reqtext: Response text, hopefully XML formatted. |
||
900 | 5 | :type reqtext: str |
|
901 | 5 | """ |
|
902 | 5 | try: |
|
903 | 5 | root = ElementTree.fromstring(reqtext) |
|
904 | except ElementTree.ParseError: |
||
905 | 5 | packtext = "SR not in system" |
|
906 | 5 | else: |
|
907 | packtext = sr_lookup_extractor(root) |
||
908 | return packtext |
||
909 | 5 | ||
910 | |||
911 | def sr_lookup_extractor(root): |
||
912 | """ |
||
913 | Take an ElementTree and extract a software release from it. |
||
914 | |||
915 | :param root: ElementTree we're barking up. |
||
916 | 5 | :type root: xml.etree.ElementTree.ElementTree |
|
917 | 5 | """ |
|
918 | 5 | reg = re.compile(r"(\d{1,4}\.)(\d{1,4}\.)(\d{1,4}\.)(\d{1,4})") |
|
919 | 5 | packages = root.findall('./data/content/') |
|
920 | 5 | for package in packages: |
|
921 | 5 | if package.text is not None: |
|
922 | 5 | match = reg.match(package.text) |
|
923 | packtext = package.text if match else "SR not in system" |
||
924 | return packtext |
||
925 | 5 | ||
926 | |||
927 | def sr_lookup_bootstrap(osv, session=None, no2=False): |
||
928 | """ |
||
929 | Run lookups for each server for given OS. |
||
930 | |||
931 | :param osv: OS to check. |
||
932 | :type osv: str |
||
933 | |||
934 | :param session: Requests session object, default is created on the fly. |
||
935 | :type session: requests.Session() |
||
936 | |||
937 | :param no2: Whether to skip Alpha2/Beta2 servers. Default is false. |
||
938 | 5 | :type no2: bool |
|
939 | 5 | """ |
|
940 | 5 | with concurrent.futures.ThreadPoolExecutor(max_workers=5) as xec: |
|
941 | try: |
||
942 | results = { |
||
943 | "p": None, |
||
944 | "a1": None, |
||
945 | "a2": None, |
||
946 | "b1": None, |
||
947 | 5 | "b2": None |
|
948 | 5 | } |
|
949 | 5 | if no2: |
|
950 | 5 | del results["a2"] |
|
951 | 5 | del results["b2"] |
|
952 | 5 | for key in results: |
|
953 | 5 | results[key] = xec.submit(sr_lookup, osv, SERVERS[key], session).result() |
|
954 | 5 | return results |
|
955 | except KeyboardInterrupt: |
||
956 | xec.shutdown(wait=False) |
||
957 | 5 | ||
958 | 5 | ||
959 | @pem_wrapper |
||
960 | def available_bundle_lookup(mcc, mnc, device, session=None): |
||
961 | """ |
||
962 | Check which software releases were ever released for a carrier. |
||
963 | |||
964 | :param mcc: Country code. |
||
965 | :type mcc: int |
||
966 | |||
967 | :param mnc: Network code. |
||
968 | :type mnc: int |
||
969 | |||
970 | :param device: Hexadecimal hardware ID. |
||
971 | :type device: str |
||
972 | |||
973 | :param session: Requests session object, default is created on the fly. |
||
974 | 5 | :type session: requests.Session() |
|
975 | 5 | """ |
|
976 | 5 | session = generic_session(session) |
|
977 | 5 | server = "https://cs.sl.blackberry.com/cse/availableBundles/1.0.0/" |
|
978 | 5 | npc = return_npc(mcc, mnc) |
|
979 | 5 | query = '<?xml version="1.0" encoding="UTF-8"?>' |
|
980 | 5 | query += '<availableBundlesRequest version="1.0.0" ' |
|
981 | 5 | query += 'authEchoTS="1366644680359">' |
|
982 | 5 | query += '<deviceId><pin>0x2FFFFFB3</pin></deviceId>' |
|
983 | 5 | query += '<clientProperties><hardware><id>0x{0}</id>'.format(device) |
|
984 | 5 | query += '<isBootROMSecure>true</isBootROMSecure></hardware>' |
|
985 | 5 | query += '<network><vendorId>0x0</vendorId><homeNPC>0x{0}</homeNPC>'.format(npc) |
|
986 | 5 | query += '<currentNPC>0x{0}</currentNPC></network><software>'.format(npc) |
|
987 | 5 | query += '<currentLocale>en_US</currentLocale>' |
|
988 | 5 | query += '<legalLocale>en_US</legalLocale>' |
|
989 | 5 | query += '<osVersion>10.0.0.0</osVersion>' |
|
990 | 5 | query += '<radioVersion>10.0.0.0</radioVersion></software>' |
|
991 | 5 | query += '</clientProperties><updateDirectives><bundleVersionFilter>' |
|
992 | 5 | query += '</bundleVersionFilter></updateDirectives>' |
|
993 | 5 | query += '</availableBundlesRequest>' |
|
994 | 5 | header = {"Content-Type": "text/xml;charset=UTF-8"} |
|
995 | 5 | req = session.post(server, headers=header, data=query) |
|
996 | 5 | root = ElementTree.fromstring(req.text) |
|
997 | 5 | package = root.find('./data/content') |
|
998 | bundlelist = [child.attrib["version"] for child in package] |
||
999 | return bundlelist |
||
1000 | 5 | ||
1001 | 5 | ||
1002 | @pem_wrapper |
||
1003 | def ptcrb_scraper(ptcrbid, session=None): |
||
1004 | """ |
||
1005 | Get the PTCRB results for a given device. |
||
1006 | |||
1007 | :param ptcrbid: Numerical ID from PTCRB (end of URL). |
||
1008 | :type ptcrbid: str |
||
1009 | |||
1010 | :param session: Requests session object, default is created on the fly. |
||
1011 | 5 | :type session: requests.Session() |
|
1012 | 5 | """ |
|
1013 | 5 | baseurl = "https://www.ptcrb.com/certified-devices/device-details/?model={0}".format(ptcrbid) |
|
1014 | 5 | sess = generic_session(session) |
|
1015 | 5 | useragent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.79 Safari/537.36" |
|
1016 | 5 | sess.headers.update({"User-agent": useragent}) |
|
1017 | 5 | soup = generic_soup_parser(baseurl, sess) |
|
1018 | 5 | certtable = soup.find_all("table")[1] |
|
1019 | 5 | tds = certtable.find_all("td")[1::2] # every other |
|
1020 | prelimlist = [tdx.text for tdx in tds] |
||
1021 | cleanlist = [ptcrb_item_cleaner(item.strip()) for item in prelimlist] |
||
1022 | 5 | return cleanlist |
|
1023 | |||
1024 | |||
1025 | def space_pad(instring, minlength): |
||
1026 | """ |
||
1027 | Pad a string with spaces until it's the minimum length. |
||
1028 | |||
1029 | :param instring: String to pad. |
||
1030 | :type instring: str |
||
1031 | |||
1032 | 5 | :param minlength: Pad while len(instring) < minlength. |
|
1033 | 5 | :type minlength: int |
|
1034 | 5 | """ |
|
1035 | while len(instring) < minlength: |
||
1036 | instring += " " |
||
1037 | 5 | return instring |
|
1038 | |||
1039 | |||
1040 | def ptcrb_cleaner_multios(item): |
||
1041 | """ |
||
1042 | Discard multiple entries for "OS". |
||
1043 | |||
1044 | 5 | :param item: The item to clean. |
|
1045 | 5 | :type item: str |
|
1046 | 5 | """ |
|
1047 | 5 | if item.count("OS") > 1: |
|
1048 | 5 | templist = item.split("OS") |
|
1049 | templist[0] = "OS" |
||
1050 | item = "".join([templist[0], templist[1]]) |
||
1051 | 5 | return item |
|
1052 | |||
1053 | |||
1054 | def ptcrb_cleaner_spaces(item): |
||
1055 | """ |
||
1056 | Pad item with spaces to the right length. |
||
1057 | |||
1058 | 5 | :param item: The item to clean. |
|
1059 | 5 | :type item: str |
|
1060 | 5 | """ |
|
1061 | 5 | spaclist = item.split(" ") |
|
1062 | 5 | if len(spaclist) > 1: |
|
1063 | 5 | spaclist[1] = space_pad(spaclist[1], 11) |
|
1064 | 5 | if len(spaclist) > 3: |
|
1065 | spaclist[3] = space_pad(spaclist[3], 11) |
||
1066 | item = " ".join(spaclist) |
||
1067 | 5 | return item |
|
1068 | |||
1069 | |||
1070 | def ptcrb_item_cleaner(item): |
||
1071 | """ |
||
1072 | Cleanup poorly formatted PTCRB entries written by an intern. |
||
1073 | |||
1074 | 5 | :param item: The item to clean. |
|
1075 | 5 | :type item: str |
|
1076 | 5 | """ |
|
1077 | 5 | item = item.replace("<td>", "") |
|
1078 | 5 | item = item.replace("</td>", "") |
|
1079 | 5 | item = item.replace("\n", "") |
|
1080 | 5 | item = item.replace("SW: OS", "OS") |
|
1081 | 5 | item = item.replace("Software Version: OS", "OS") |
|
1082 | 5 | item = item.replace(" (SR", ", SR") |
|
1083 | 5 | item = re.sub(r"\s?\((.*)$", "", item) |
|
1084 | 5 | item = re.sub(r"\sSV.*$", "", item) |
|
1085 | 5 | item = item.replace(")", "") |
|
1086 | 5 | item = item.replace(". ", ".") |
|
1087 | 5 | item = item.replace(";", "") |
|
1088 | 5 | item = item.replace("version", "Version") |
|
1089 | 5 | item = item.replace("Verison", "Version") |
|
1090 | 5 | item = ptcrb_cleaner_multios(item) |
|
1091 | 5 | item = item.replace("SR10", "SR 10") |
|
1092 | 5 | item = item.replace("SR", "SW Release") |
|
1093 | 5 | item = item.replace(" Version:", ":") |
|
1094 | 5 | item = item.replace("Version ", " ") |
|
1095 | 5 | item = item.replace(":1", ": 1") |
|
1096 | 5 | item = item.replace(", ", " ") |
|
1097 | 5 | item = item.replace(",", " ") |
|
1098 | 5 | item = item.replace("Software", "SW") |
|
1099 | 5 | item = item.replace(" ", " ") |
|
1100 | 5 | item = item.replace("OS ", "OS: ") |
|
1101 | 5 | item = item.replace("Radio ", "Radio: ") |
|
1102 | 5 | item = item.replace("Release ", "Release: ") |
|
1103 | 5 | item = ptcrb_cleaner_spaces(item) |
|
1104 | 5 | item = item.strip() |
|
1105 | 5 | item = item.replace("\r", "") |
|
1106 | 5 | if item.startswith("10"): |
|
1107 | 5 | item = "OS: {0}".format(item) |
|
1108 | item = item.replace(": ", ": ") |
||
1109 | item = item.replace(": ", ": ") |
||
1110 | 5 | return item |
|
1111 | 5 | ||
1112 | |||
1113 | @pem_wrapper |
||
1114 | def kernel_scraper(utils=False, session=None): |
||
1115 | """ |
||
1116 | Scrape BlackBerry's GitHub kernel repo for available branches. |
||
1117 | |||
1118 | :param utils: Check android-utils repo instead of android-linux-kernel. Default is False. |
||
1119 | :type utils: bool |
||
1120 | |||
1121 | 5 | :param session: Requests session object, default is created on the fly. |
|
1122 | 5 | :type session: requests.Session() |
|
1123 | 5 | """ |
|
1124 | 5 | repo = "android-utils" if utils else "android-linux-kernel" |
|
1125 | 5 | kernlist = [] |
|
1126 | 5 | sess = generic_session(session) |
|
1127 | 5 | for page in range(1, 10): |
|
1128 | 5 | url = "https://github.com/blackberry/{0}/branches/all?page={1}".format(repo, page) |
|
1129 | soup = generic_soup_parser(url, sess) |
||
1130 | 5 | if soup.find("div", {"class": "no-results-message"}): |
|
1131 | 5 | break |
|
1132 | 5 | else: |
|
1133 | text = soup.get_text() |
||
1134 | kernlist.extend(re.findall(r"msm[0-9]{4}\/[A-Z0-9]{6}", text, re.IGNORECASE)) |
||
1135 | 5 | return kernlist |
|
1136 | |||
1137 | |||
1138 | def root_generator(folder, build, variant="common"): |
||
1139 | """ |
||
1140 | Generate roots for the SHAxxx hash lookup URLs. |
||
1141 | |||
1142 | :param folder: Dictionary of variant: loader name pairs. |
||
1143 | :type folder: dict(str: str) |
||
1144 | |||
1145 | :param build: Build to check, 3 letters + 3 numbers. |
||
1146 | :type build: str |
||
1147 | |||
1148 | :param variant: Autoloader variant. Default is "common". |
||
1149 | 5 | :type variant: str |
|
1150 | """ |
||
1151 | 5 | #Priv specific |
|
1152 | privx = "bbfoundation/hashfiles_priv/{0}".format(folder[variant]) |
||
1153 | 5 | #DTEK50 specific |
|
1154 | dtek50x = "bbSupport/DTEK50" if build[:3] == "AAF" else "bbfoundation/hashfiles_priv/dtek50" |
||
1155 | 5 | #DTEK60 specific |
|
1156 | 5 | dtek60x = dtek50x # still uses dtek50 folder, for some reason |
|
1157 | #Pack it up |
||
1158 | roots = {"Priv": privx, "DTEK50": dtek50x, "DTEK60": dtek60x} |
||
1159 | 5 | return roots |
|
1160 | |||
1161 | |||
1162 | def make_droid_skeleton_bbm(method, build, device, variant="common"): |
||
1163 | """ |
||
1164 | Make an Android autoloader/hash URL, on the BB Mobile site. |
||
1165 | |||
1166 | :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash. |
||
1167 | :type method: str |
||
1168 | |||
1169 | :param build: Build to check, 3 letters + 3 numbers. |
||
1170 | :type build: str |
||
1171 | |||
1172 | :param device: Device to check. |
||
1173 | :type device: str |
||
1174 | |||
1175 | 5 | :param variant: Autoloader variant. Default is "common". |
|
1176 | 5 | :type variant: str |
|
1177 | 5 | """ |
|
1178 | 5 | devices = {"KEYone": "qc8953", "Motion": "qc8953"} |
|
1179 | base = "bbry_{2}_autoloader_user-{0}-{1}".format(variant, build.upper(), devices[device]) |
||
1180 | 5 | if method is None: |
|
1181 | 5 | skel = "http://54.247.87.13/softwareupgrade/BBM/{0}.zip".format(base) |
|
1182 | else: |
||
1183 | skel = "http://54.247.87.13/softwareupgrade/BBM/{0}.{1}sum".format(base, method.lower()) |
||
1184 | 5 | return skel |
|
1185 | |||
1186 | |||
1187 | def make_droid_skeleton_og(method, build, device, variant="common"): |
||
1188 | """ |
||
1189 | Make an Android autoloader/hash URL, on the original site. |
||
1190 | |||
1191 | :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash. |
||
1192 | :type method: str |
||
1193 | |||
1194 | :param build: Build to check, 3 letters + 3 numbers. |
||
1195 | :type build: str |
||
1196 | |||
1197 | :param device: Device to check. |
||
1198 | :type device: str |
||
1199 | |||
1200 | 5 | :param variant: Autoloader variant. Default is "common". |
|
1201 | 5 | :type variant: str |
|
1202 | 5 | """ |
|
1203 | 5 | folder = {"vzw-vzw": "verizon", "na-att": "att", "na-tmo": "tmo", "common": "default"} |
|
1204 | 5 | devices = {"Priv": "qc8992", "DTEK50": "qc8952_64_sfi", "DTEK60": "qc8996"} |
|
1205 | 5 | roots = root_generator(folder, build, variant) |
|
1206 | base = "bbry_{2}_autoloader_user-{0}-{1}".format(variant, build.upper(), devices[device]) |
||
1207 | 5 | if method is None: |
|
1208 | 5 | baseurl = "https://bbapps.download.blackberry.com/Priv" |
|
1209 | skel = "{1}/{0}.zip".format(base, baseurl) |
||
1210 | else: |
||
1211 | 5 | baseurl = "https://ca.blackberry.com/content/dam" |
|
1212 | skel = "{3}/{1}/{0}.{2}sum".format(base, roots[device], method.lower(), baseurl) |
||
1213 | return skel |
||
1214 | |||
1215 | |||
1216 | def make_droid_skeleton(method, build, device, variant="common"): |
||
1217 | """ |
||
1218 | Make an Android autoloader/hash URL. |
||
1219 | |||
1220 | :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash. |
||
1221 | :type method: str |
||
1222 | |||
1223 | :param build: Build to check, 3 letters + 3 numbers. |
||
1224 | :type build: str |
||
1225 | |||
1226 | :param device: Device to check. |
||
1227 | :type device: str |
||
1228 | 5 | ||
1229 | 5 | :param variant: Autoloader variant. Default is "common". |
|
1230 | 5 | :type variant: str |
|
1231 | 5 | """ |
|
1232 | 5 | # No Aurora |
|
1233 | 5 | oglist = ("Priv", "DTEK50", "DTEK60") # BlackBerry |
|
1234 | 5 | bbmlist = ("KEYone", "Motion") # BB Mobile |
|
1235 | if device in oglist: |
||
1236 | skel = make_droid_skeleton_og(method, build, device, variant) |
||
1237 | 5 | elif device in bbmlist: |
|
1238 | skel = make_droid_skeleton_bbm(method, build, device, variant) |
||
1239 | return skel |
||
1 ignored issue
–
show
|
|||
1240 | |||
1241 | |||
1242 | def bulk_droid_skeletons(devs, build, method=None): |
||
1243 | """ |
||
1244 | Prepare list of Android autoloader/hash URLs. |
||
1245 | |||
1246 | :param devs: List of devices. |
||
1247 | :type devs: list(str) |
||
1248 | |||
1249 | :param build: Build to check, 3 letters + 3 numbers. |
||
1250 | 5 | :type build: str |
|
1251 | |||
1252 | :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash. |
||
1253 | :type method: str |
||
1254 | 5 | """ |
|
1255 | 5 | carrier_variants = { |
|
1256 | 5 | "Priv": ("common", "vzw-vzw", "na-tmo", "na-att"), |
|
1257 | 5 | "KEYone": ("common", "usa-sprint", "global-att", "china-china") |
|
1258 | 5 | } |
|
1259 | 5 | common_variants = ("common", ) # for single-variant devices |
|
1260 | 5 | carrier_devices = ("Priv", ) # add KEYone when verified |
|
1261 | 5 | skels = [] |
|
1262 | 5 | for dev in devs: |
|
1263 | varlist = carrier_variants[dev] if dev in carrier_devices else common_variants |
||
1264 | for var in varlist: |
||
1265 | 5 | skel = make_droid_skeleton(method, build, dev, var) |
|
1266 | skels.append(skel) |
||
1267 | return skels |
||
1268 | |||
1269 | |||
1270 | def prepare_droid_list(device): |
||
1271 | """ |
||
1272 | 5 | Convert single devices to a list, if necessary. |
|
1273 | 5 | ||
1274 | :param device: Device to check. |
||
1275 | 5 | :type device: str |
|
1276 | 5 | """ |
|
1277 | if isinstance(device, list): |
||
1278 | devs = device |
||
1279 | 5 | else: |
|
1280 | devs = [device] |
||
1281 | return devs |
||
1282 | |||
1283 | |||
1284 | def droid_scanner(build, device, method=None, session=None): |
||
1285 | """ |
||
1286 | Check for Android autoloaders on BlackBerry's site. |
||
1287 | |||
1288 | :param build: Build to check, 3 letters + 3 numbers. |
||
1289 | :type build: str |
||
1290 | |||
1291 | :param device: Device to check. |
||
1292 | :type device: str |
||
1293 | |||
1294 | :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash. |
||
1295 | 5 | :type method: str |
|
1296 | 5 | ||
1297 | 5 | :param session: Requests session object, default is created on the fly. |
|
1298 | 5 | :type session: requests.Session() |
|
1299 | 5 | """ |
|
1300 | devs = prepare_droid_list(device) |
||
1301 | skels = bulk_droid_skeletons(devs, build, method) |
||
1302 | 5 | with concurrent.futures.ThreadPoolExecutor(max_workers=len(skels)) as xec: |
|
1303 | results = droid_scanner_worker(xec, skels, session) |
||
1304 | return results if results else None |
||
1305 | |||
1306 | |||
1307 | def droid_scanner_worker(xec, skels, session=None): |
||
1308 | """ |
||
1309 | Worker to check for Android autoloaders. |
||
1310 | |||
1311 | :param xec: ThreadPoolExecutor instance. |
||
1312 | :type xec: concurrent.futures.ThreadPoolExecutor |
||
1313 | |||
1314 | :param skels: List of skeleton formats. |
||
1315 | 5 | :type skels: list(str) |
|
1316 | 5 | ||
1317 | 5 | :param session: Requests session object, default is created on the fly. |
|
1318 | 5 | :type session: requests.Session() |
|
1319 | 5 | """ |
|
1320 | 5 | results = [] |
|
1321 | for skel in skels: |
||
1322 | avail = xec.submit(availability, skel, session) |
||
1323 | 5 | if avail.result(): |
|
1324 | results.append(skel) |
||
1325 | return results |
||
1326 | |||
1327 | |||
1328 | def chunker(iterable, inc): |
||
1329 | """ |
||
1330 | Convert an iterable into a list of inc sized lists. |
||
1331 | |||
1332 | :param iterable: Iterable to chunk. |
||
1333 | 5 | :type iterable: list/tuple/string |
|
1334 | 5 | ||
1335 | :param inc: Increment; how big each chunk is. |
||
1336 | :type inc: int |
||
1337 | 5 | """ |
|
1338 | chunks = [iterable[x:x+inc] for x in range(0, len(iterable), inc)] |
||
1339 | return chunks |
||
1340 | |||
1341 | |||
1342 | def unicode_filter(intext): |
||
1343 | """ |
||
1344 | 5 | Remove Unicode crap. |
|
1345 | |||
1346 | :param intext: Text to filter. |
||
1347 | 5 | :type intext: str |
|
1348 | """ |
||
1349 | return intext.replace("\u2013", "").strip() |
||
1350 | |||
1351 | |||
1352 | def table_header_filter(ptag): |
||
1353 | """ |
||
1354 | 5 | Validate p tag, to see if it's relevant. |
|
1355 | 5 | ||
1356 | :param ptag: P tag. |
||
1357 | :type ptag: bs4.element.Tag |
||
1358 | 5 | """ |
|
1359 | valid = ptag.find("b") and "BlackBerry" in ptag.text and not "experts" in ptag.text |
||
1360 | return valid |
||
1361 | |||
1362 | |||
1363 | def table_headers(pees): |
||
1364 | """ |
||
1365 | 5 | Generate table headers from list of p tags. |
|
1366 | 5 | ||
1367 | :param pees: List of p tags. |
||
1368 | :type pees: list(bs4.element.Tag) |
||
1369 | 5 | """ |
|
1370 | 5 | bolds = [x.text for x in pees if table_header_filter(x)] |
|
1371 | return bolds |
||
1372 | |||
1373 | |||
1374 | @pem_wrapper |
||
1375 | def loader_page_scraper(session=None): |
||
1376 | """ |
||
1377 | 5 | Return scraped autoloader pages. |
|
1378 | 5 | ||
1379 | 5 | :param session: Requests session object, default is created on the fly. |
|
1380 | :type session: requests.Session() |
||
1381 | """ |
||
1382 | 5 | session = generic_session(session) |
|
1383 | loader_page_scraper_og(session) |
||
1384 | loader_page_scraper_bbm(session) |
||
1385 | |||
1386 | |||
1387 | def loader_page_scraper_og(session=None): |
||
1388 | """ |
||
1389 | 5 | Return scraped autoloader page, original site. |
|
1390 | 5 | ||
1391 | 5 | :param session: Requests session object, default is created on the fly. |
|
1392 | 5 | :type session: requests.Session() |
|
1393 | 5 | """ |
|
1394 | 5 | url = "https://ca.blackberry.com/support/smartphones/Android-OS-Reload.html" |
|
1395 | soup = generic_soup_parser(url, session) |
||
1396 | tables = soup.find_all("table") |
||
1397 | 5 | headers = table_headers(soup.find_all("p")) |
|
1398 | for idx, table in enumerate(tables): |
||
1399 | loader_page_chunker_og(idx, table, headers) |
||
1400 | |||
1401 | |||
1402 | def loader_page_scraper_bbm(session=None): |
||
1403 | """ |
||
1404 | 5 | Return scraped autoloader page, new site. |
|
1405 | 5 | ||
1406 | 5 | :param session: Requests session object, default is created on the fly. |
|
1407 | 5 | :type session: requests.Session() |
|
1408 | 5 | """ |
|
1409 | 5 | url = "https://www.blackberrymobile.com/support/reload-software/" |
|
1410 | soup = generic_soup_parser(url, session) |
||
1411 | ulls = soup.find_all("ul", {"class": re.compile("list-two special-.")})[1:] |
||
1412 | 5 | print("~~~BlackBerry KEYone~~~") |
|
1413 | for ull in ulls: |
||
1414 | loader_page_chunker_bbm(ull) |
||
1415 | |||
1416 | |||
1417 | def loader_page_chunker_og(idx, table, headers): |
||
1418 | """ |
||
1419 | Given a loader page table, chunk it into lists of table cells. |
||
1420 | |||
1421 | :param idx: Index of enumerating tables. |
||
1422 | :type idx: int |
||
1423 | |||
1424 | :param table: HTML table tag. |
||
1425 | 5 | :type table: bs4.element.Tag |
|
1426 | 5 | ||
1427 | 5 | :param headers: List of table headers. |
|
1428 | 5 | :type headers: list(str) |
|
1429 | 5 | """ |
|
1430 | print("~~~{0}~~~".format(headers[idx])) |
||
1431 | chunks = chunker(table.find_all("td"), 4) |
||
1432 | 5 | for chunk in chunks: |
|
1433 | loader_page_printer(chunk) |
||
1434 | print(" ") |
||
1435 | |||
1436 | |||
1437 | def loader_page_chunker_bbm(ull): |
||
1438 | """ |
||
1439 | 5 | Given a loader page list, chunk it into lists of list items. |
|
1440 | 5 | ||
1441 | 5 | :param ull: HTML unordered list tag. |
|
1442 | :type ull: bs4.element.Tag |
||
1443 | """ |
||
1444 | 5 | chunks = chunker(ull.find_all("li"), 3) |
|
1445 | for chunk in chunks: |
||
1446 | loader_page_printer(chunk) |
||
1447 | |||
1448 | |||
1449 | def loader_page_printer(chunk): |
||
1450 | """ |
||
1451 | 5 | Print individual cell texts given a list of table cells. |
|
1452 | 5 | ||
1453 | 5 | :param chunk: List of td tags. |
|
1454 | 5 | :type chunk: list(bs4.element.Tag) |
|
1455 | """ |
||
1456 | key = unicode_filter(chunk[0].text) |
||
1457 | 5 | ver = unicode_filter(chunk[1].text) |
|
1458 | 5 | link = unicode_filter(chunk[2].find("a")["href"]) |
|
1459 | print("{0}\n {1}: {2}".format(key, ver, link)) |
||
1460 | |||
1461 | |||
1462 | @pem_wrapper |
||
1463 | def base_metadata(url, session=None): |
||
1464 | """ |
||
1465 | Get BBNDK metadata, base function. |
||
1466 | |||
1467 | :param url: URL to check. |
||
1468 | 5 | :type url: str |
|
1469 | 5 | ||
1470 | 5 | :param session: Requests session object, default is created on the fly. |
|
1471 | 5 | :type session: requests.Session() |
|
1472 | 5 | """ |
|
1473 | 5 | session = generic_session(session) |
|
1474 | req = session.get(url) |
||
1475 | data = req.content |
||
1476 | 5 | entries = data.split(b"\n") |
|
1477 | metadata = [entry.split(b",")[1].decode("utf-8") for entry in entries if entry] |
||
1478 | return metadata |
||
1479 | |||
1480 | |||
1481 | def base_metadata_url(alternate=None): |
||
1482 | """ |
||
1483 | 5 | Return metadata URL. |
|
1484 | 5 | ||
1485 | 5 | :param alternate: If the URL is for the simulator metadata. Default is False. |
|
1486 | :type alternate: str |
||
1487 | """ |
||
1488 | 5 | baseurl = "http://downloads.blackberry.com/upr/developers/update/bbndk" |
|
1489 | tail = "{0}/{0}_metadata".format(alternate) if alternate is not None else "metadata" |
||
1490 | return "{0}/{1}".format(baseurl, tail) |
||
1491 | |||
1492 | |||
1493 | def ndk_metadata(session=None): |
||
1494 | """ |
||
1495 | 5 | Get BBNDK target metadata. |
|
1496 | 5 | ||
1497 | :param session: Requests session object, default is created on the fly. |
||
1498 | :type session: requests.Session() |
||
1499 | 5 | """ |
|
1500 | ndkurl = base_metadata_url() |
||
1501 | data = base_metadata(ndkurl, session) |
||
1502 | metadata = [entry for entry in data if entry.startswith(("10.0", "10.1", "10.2"))] |
||
1503 | return metadata |
||
1504 | |||
1505 | |||
1506 | 5 | def sim_metadata(session=None): |
|
1507 | 5 | """ |
|
1508 | Get BBNDK simulator metadata. |
||
1509 | |||
1510 | 5 | :param session: Requests session object, default is created on the fly. |
|
1511 | :type session: requests.Session() |
||
1512 | """ |
||
1513 | simurl = base_metadata_url("simulator") |
||
1514 | metadata = base_metadata(simurl, session) |
||
1515 | return metadata |
||
1516 | |||
1517 | 5 | ||
1518 | 5 | def runtime_metadata(session=None): |
|
1519 | """ |
||
1520 | Get BBNDK runtime metadata. |
||
1521 | 5 | ||
1522 | 5 | :param session: Requests session object, default is created on the fly. |
|
1523 | :type session: requests.Session() |
||
1524 | """ |
||
1525 | rturl = base_metadata_url("runtime") |
||
1526 | metadata = base_metadata(rturl, session) |
||
1527 | return metadata |
||
1528 | |||
1529 | |||
1530 | def series_generator(osversion): |
||
1531 | """ |
||
1532 | Generate series/branch name from OS version. |
||
1533 | |||
1534 | :param osversion: OS version. |
||
1535 | 5 | :type osversion: str |
|
1536 | 5 | """ |
|
1537 | 5 | splits = osversion.split(".") |
|
1538 | 5 | return "BB{0}_{1}_{2}".format(*splits[0:3]) |
|
1539 | 5 | ||
1540 | |||
1541 | 5 | @pem_wrapper |
|
1542 | 5 | def devalpha_urls(osversion, skel, session=None): |
|
1543 | """ |
||
1544 | Check individual Dev Alpha autoloader URLs. |
||
1545 | 5 | ||
1546 | :param osversion: OS version. |
||
1547 | :type osversion: str |
||
1548 | |||
1549 | :param skel: Individual skeleton format to try. |
||
1550 | :type skel: str |
||
1551 | |||
1552 | :param session: Requests session object, default is created on the fly. |
||
1553 | :type session: requests.Session() |
||
1554 | """ |
||
1555 | 5 | session = generic_session(session) |
|
1556 | 5 | baseurl = "http://downloads.blackberry.com/upr/developers/downloads" |
|
1557 | 5 | url = "{2}/{0}{1}.exe".format(skel, osversion, baseurl) |
|
1558 | 5 | req = session.head(url) |
|
1559 | 5 | if req.status_code == 200: |
|
1560 | finals = (url, req.headers["content-length"]) |
||
1561 | else: |
||
1562 | 5 | finals = () |
|
1563 | return finals |
||
1564 | |||
1565 | |||
1566 | def devalpha_urls_serieshandler(osversion, skeletons): |
||
1567 | """ |
||
1568 | Process list of candidate Dev Alpha autoloader URLs. |
||
1569 | |||
1570 | :param osversion: OS version. |
||
1571 | :type osversion: str |
||
1572 | |||
1573 | :param skeletons: List of skeleton formats to try. |
||
1574 | :type skeletons: list |
||
1575 | """ |
||
1576 | skels = skeletons |
||
1577 | for idx, skel in enumerate(skeletons): |
||
1578 | 5 | if "<SERIES>" in skel: |
|
1579 | 5 | skels[idx] = skel.replace("<SERIES>", series_generator(osversion)) |
|
1580 | 5 | return skels |
|
1581 | 5 | ||
1582 | 5 | ||
1583 | 5 | def devalpha_urls_bulk(osversion, skeletons, xec, session=None): |
|
1584 | 5 | """ |
|
1585 | Construct list of valid Dev Alpha autoloader URLs. |
||
1586 | |||
1587 | 5 | :param osversion: OS version. |
|
1588 | :type osversion: str |
||
1589 | |||
1590 | :param skeletons: List of skeleton formats to try. |
||
1591 | :type skeletons: list |
||
1592 | |||
1593 | :param xec: ThreadPoolExecutor instance. |
||
1594 | :type xec: concurrent.futures.ThreadPoolExecutor |
||
1595 | |||
1596 | :param session: Requests session object, default is created on the fly. |
||
1597 | :type session: requests.Session() |
||
1598 | """ |
||
1599 | finals = {} |
||
1600 | 5 | skels = devalpha_urls_serieshandler(osversion, skeletons) |
|
1601 | 5 | for skel in skels: |
|
1602 | 5 | final = xec.submit(devalpha_urls, osversion, skel, session).result() |
|
1603 | 5 | if final: |
|
1604 | 5 | finals[final[0]] = final[1] |
|
1605 | return finals |
||
1606 | |||
1607 | 5 | ||
1608 | def devalpha_urls_bootstrap(osversion, skeletons, session=None): |
||
1609 | """ |
||
1610 | Get list of valid Dev Alpha autoloader URLs. |
||
1611 | |||
1612 | :param osversion: OS version. |
||
1613 | :type osversion: str |
||
1614 | 5 | ||
1615 | 5 | :param skeletons: List of skeleton formats to try. |
|
1616 | 5 | :type skeletons: list |
|
1617 | 5 | ||
1618 | :param session: Requests session object, default is created on the fly. |
||
1619 | :type session: requests.Session() |
||
1620 | 5 | """ |
|
1621 | with concurrent.futures.ThreadPoolExecutor(max_workers=5) as xec: |
||
1622 | try: |
||
1623 | return devalpha_urls_bulk(osversion, skeletons, xec, session) |
||
1624 | except KeyboardInterrupt: |
||
1625 | xec.shutdown(wait=False) |
||
1626 | |||
1627 | |||
1628 | def dev_dupe_dicter(finals): |
||
1629 | """ |
||
1630 | 5 | Prepare dictionary to clean duplicate autoloaders. |
|
1631 | 5 | ||
1632 | 5 | :param finals: Dict of URL:content-length pairs. |
|
1633 | 5 | :type finals: dict(str: str) |
|
1634 | 5 | """ |
|
1635 | revo = {} |
||
1636 | for key, val in finals.items(): |
||
1637 | 5 | revo.setdefault(val, set()).add(key) |
|
1638 | return revo |
||
1639 | |||
1640 | |||
1641 | def dev_dupe_remover(finals, dupelist): |
||
1642 | """ |
||
1643 | Filter dictionary of autoloader entries. |
||
1644 | 5 | ||
1645 | 5 | :param finals: Dict of URL:content-length pairs. |
|
1646 | 5 | :type finals: dict(str: str) |
|
1647 | 5 | ||
1648 | :param dupelist: List of duplicate URLs. |
||
1649 | :type duplist: list(str) |
||
1650 | """ |
||
1651 | for dupe in dupelist: |
||
1652 | for entry in dupe: |
||
1653 | if "DevAlpha" in entry: |
||
1654 | del finals[entry] |
||
1655 | return finals |
||
1656 | |||
1657 | |||
1658 | def dev_dupe_cleaner(finals): |
||
1659 | """ |
||
1660 | Clean duplicate autoloader entries. |
||
1661 | |||
1662 | :param finals: Dict of URL:content-length pairs. |
||
1663 | :type finals: dict(str: str) |
||
1664 | """ |
||
1665 | revo = dev_dupe_dicter(finals) |
||
1666 | dupelist = [val for key, val in revo.items() if len(val) > 1] |
||
1667 | finals = dev_dupe_remover(finals, dupelist) |
||
1668 | return finals |
||
1669 |