GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( cfccd9...b77fc8 )
by Gonzalo
27s
created

RequestsDownloadWorker   A

Complexity

Total Complexity 4

Size/Duplication

Total Lines 35
Duplicated Lines 100 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
c 2
b 0
f 0
dl 35
loc 35
rs 10
wmc 4

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
# -*- coding: utf-8 -*-
2
# -----------------------------------------------------------------------------
3
# Copyright © 2015- The Spyder Development Team
4
# Copyright © 2014-2015 Gonzalo Peña-Castellanos (@goanpeca)
5
#
6
# Licensed under the terms of the MIT License
7
# -----------------------------------------------------------------------------
8
"""Worker threads for downloading files."""
9
10
# Standard library imports
11
from collections import deque
12
import json
13
import os
14
import re
15
import sys
16
17
# Third party imports
18
from qtpy.QtCore import QByteArray, QObject, QThread, QTimer, QUrl, Signal
19
from qtpy.QtNetwork import (QNetworkAccessManager, QNetworkProxy,
20
                            QNetworkProxyFactory, QNetworkRequest)
21
import requests
22
23
# Local imports
24
from conda_manager.api.conda_api import CondaAPI
25
from conda_manager.utils.logs import logger
26
from conda_manager.utils.py3compat import to_text_string
27
28
PROXY_RE = re.compile(r'(?P<scheme>.*?)://'
29
                      '((?P<username>.*):(?P<password>.*)@)?'
30
                      '(?P<host_port>.*)')
31
32
33
def handle_qbytearray(obj, encoding):
34
    """Qt/Python3 compatibility helper."""
35
    if isinstance(obj, QByteArray):
36
        obj = obj.data()
37
38
    return to_text_string(obj, encoding=encoding)
39
40
41
def process_proxy_servers(proxy_settings):
42
    """Split the proxy conda configuration to be used by the proxy factory."""
43
    proxy_settings_dic = {}
44
45
    for key in proxy_settings:
46
        proxy = proxy_settings[key]
47
        proxy_config = [m.groupdict() for m in PROXY_RE.finditer(proxy)]
48
        if proxy_config:
49
            proxy_config = proxy_config[0]
50
            host_port = proxy_config.pop('host_port')
51
            if ':' in host_port:
52
                host, port = host_port.split(':')
53
            else:
54
                host, port = host_port, None
55
            proxy_config['host'] = host
56
            proxy_config['port'] = int(port) if port else None
57
            proxy_settings_dic[key] = proxy_config
58
            proxy_config['full'] = proxy_settings[key]
59
60
    return proxy_settings_dic
61
62
63
class NetworkProxyFactory(QNetworkProxyFactory):
64
    """Proxy factory to handle different proxy configuration."""
65
66
    def __init__(self, *args, **kwargs):
67
        """Proxy factory to handle different proxy configuration."""
68
        self._load_rc_func = kwargs.pop('load_rc_func', None)
69
        super(NetworkProxyFactory, self).__init__(*args, **kwargs)
70
71
    @property
72
    def proxy_servers(self):
73
        """
74
        Return the proxy servers available.
75
76
        First env variables will be searched and updated with values from
77
        condarc config file.
78
        """
79
        proxy_servers = {}
80
        if self._load_rc_func is None:
81
            return proxy_servers
82
        else:
83
            HTTP_PROXY = os.environ.get('HTTP_PROXY')
84
            HTTPS_PROXY = os.environ.get('HTTPS_PROXY')
85
86
            if HTTP_PROXY:
87
                proxy_servers['http'] = HTTP_PROXY
88
89
            if HTTPS_PROXY:
90
                proxy_servers['https'] = HTTPS_PROXY
91
92
            proxy_servers_conf = self._load_rc_func().get('proxy_servers', {})
93
            proxy_servers.update(proxy_servers_conf)
94
95
            return proxy_servers
96
97
    @staticmethod
98
    def _create_proxy(proxy_setting):
99
        """Create a Network proxy for the given proxy settings."""
100
        proxy = QNetworkProxy()
101
        proxy_scheme = proxy_setting['scheme']
102
        proxy_host = proxy_setting['host']
103
        proxy_port = proxy_setting['port']
104
        proxy_username = proxy_setting['username']
105
        proxy_password = proxy_setting['password']
106
        proxy_scheme_host = '{0}://{1}'.format(proxy_scheme, proxy_host)
107
        proxy.setType(QNetworkProxy.HttpProxy)
108
109
        if proxy_scheme_host:
110
            # proxy.setHostName(proxy_scheme_host)  # does not work with scheme
111
            proxy.setHostName(proxy_host)
112
113
        if proxy_port:
114
            proxy.setPort(proxy_port)
115
116
        if proxy_username:
117
            proxy.setUser(proxy_username)
118
119
        if proxy_password:
120
            proxy.setPassword(proxy_password)
121
122
        return proxy
123
124
    def queryProxy(self, query):
125
        """Override Qt method."""
126
        # Query is a QNetworkProxyQuery
127
        valid_proxies = []
128
129
        query_scheme = query.url().scheme()
130
        query_host = query.url().host()
131
        query_scheme_host = '{0}://{1}'.format(query_scheme, query_host)
132
        proxy_servers = process_proxy_servers(self.proxy_servers)
133
#        print(proxy_servers)
134
135
        if proxy_servers:
136
            for key in proxy_servers:
137
                proxy_settings = proxy_servers[key]
138
139
                if key == 'http' and query_scheme == 'http':
140
                    proxy = self._create_proxy(proxy_settings)
141
                    valid_proxies.append(proxy)
142
                elif key == 'https' and query_scheme == 'https':
143
                    proxy = self._create_proxy(proxy_settings)
144
                    valid_proxies.append(proxy)
145
146
                if key == query_scheme_host:
147
                    proxy = self._create_proxy(proxy_settings)
148
                    valid_proxies.append(proxy)
149
        else:
150
            valid_proxies.append(QNetworkProxy(QNetworkProxy.DefaultProxy))
151
152
#        print('factoy', query.url().toString())
153
#        print(valid_proxies)
154
#        for pr in valid_proxies:
155
#            user = pr.user()
156
#            password = pr.password()
157
#            host = pr.hostName()
158
#            port = pr.port()
159
#            print(query.url(), user, password, host, port)
160
#        print('\n')
161
        return valid_proxies
162
163
164
class DownloadWorker(QObject):
165
    """Qt Download worker."""
166
167
    # url, path
168
    sig_download_finished = Signal(str, str)
169
170
    # url, path, progress_size, total_size
171
    sig_download_progress = Signal(str, str, int, int)
172
    sig_finished = Signal(object, object, object)
173
174
    def __init__(self, url, path):
175
        """Qt Download worker."""
176
        super(DownloadWorker, self).__init__()
177
        self.url = url
178
        self.path = path
179
        self.finished = False
180
181
    def is_finished(self):
182
        """Return True if worker status is finished otherwise return False."""
183
        return self.finished
184
185
186
class _DownloadAPI(QObject):
187
    """Download API based on QNetworkAccessManager."""
188
189
    def __init__(self, chunk_size=1024, load_rc_func=None):
190
        """Download API based on QNetworkAccessManager."""
191
        super(_DownloadAPI, self).__init__()
192
        self._chunk_size = chunk_size
193
        self._head_requests = {}
194
        self._get_requests = {}
195
        self._paths = {}
196
        self._workers = {}
197
198
        self._load_rc_func = load_rc_func
199
        self._manager = QNetworkAccessManager(self)
200
        self._proxy_factory = NetworkProxyFactory(load_rc_func=load_rc_func)
201
        self._timer = QTimer()
202
203
        # Setup
204
        self._manager.setProxyFactory(self._proxy_factory)
205
        self._timer.setInterval(1000)
206
        self._timer.timeout.connect(self._clean)
207
208
        # Signals
209
        self._manager.finished.connect(self._request_finished)
210
        self._manager.sslErrors.connect(self._handle_ssl_errors)
211
        self._manager.proxyAuthenticationRequired.connect(
212
            self._handle_proxy_auth)
213
214
    @staticmethod
215
    def _handle_ssl_errors(reply, errors):
216
        """Callback for ssl_errors."""
217
        logger.error(str(('SSL Errors', errors, reply)))
218
219
    @staticmethod
220
    def _handle_proxy_auth(proxy, authenticator):
221
        """Callback for ssl_errors."""
222
#        authenticator.setUser('1')`
223
#        authenticator.setPassword('1')
224
        logger.error(str(('Proxy authentication Error. '
225
                          'Enter credentials in condarc',
226
                          proxy,
227
                          authenticator)))
228
229
    def _clean(self):
230
        """Check for inactive workers and remove their references."""
231
        if self._workers:
232 View Code Duplication
            for url in self._workers.copy():
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
233
                w = self._workers[url]
234
                if w.is_finished():
235
                    self._workers.pop(url)
236
                    self._paths.pop(url)
237
                    if url in self._get_requests:
238
                        self._get_requests.pop(url)
239
240
        else:
241
            self._timer.stop()
242
243
    def _request_finished(self, reply):
244
        """Callback for download once the request has finished."""
245
        url = to_text_string(reply.url().toEncoded(), encoding='utf-8')
246
247
        if url in self._paths:
248
            path = self._paths[url]
249
        if url in self._workers:
250
            worker = self._workers[url]
251
252
        if url in self._head_requests:
253
            error = reply.error()
254
#            print(url, error)
255
            if error:
256
                logger.error(str(('Head Reply Error:', error)))
257
                worker.sig_download_finished.emit(url, path)
258
                worker.sig_finished.emit(worker, path, error)
259
                return
260
261
            self._head_requests.pop(url)
262
            start_download = not bool(error)
263
            header_pairs = reply.rawHeaderPairs()
264
            headers = {}
265
266
            for hp in header_pairs:
267
                headers[to_text_string(hp[0]).lower()] = to_text_string(hp[1])
268
269
            total_size = int(headers.get('content-length', 0))
270
271
            # Check if file exists
272
            if os.path.isfile(path):
273
                file_size = os.path.getsize(path)
274
275 View Code Duplication
                # Check if existing file matches size of requested file
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
276
                start_download = file_size != total_size
277
278
            if start_download:
279
                # File sizes dont match, hence download file
280
                qurl = QUrl(url)
281
                request = QNetworkRequest(qurl)
282
                self._get_requests[url] = request
283
                reply = self._manager.get(request)
284
285
                error = reply.error()
286
                if error:
287
                    logger.error(str(('Reply Error:', error)))
288
289
                reply.downloadProgress.connect(
290
                    lambda r, t, w=worker: self._progress(r, t, w))
291
            else:
292
                # File sizes match, dont download file or error?
293
                worker.finished = True
294
                worker.sig_download_finished.emit(url, path)
295
                worker.sig_finished.emit(worker, path, None)
296
        elif url in self._get_requests:
297
            data = reply.readAll()
298
            self._save(url, path, data)
299
300
    def _save(self, url, path, data):
301
        """Save `data` of downloaded `url` in `path`."""
302
        worker = self._workers[url]
303
        path = self._paths[url]
304
305
        if len(data):
306
            try:
307
                with open(path, 'wb') as f:
308
                    f.write(data)
309
            except Exception:
310
                logger.error((url, path))
311 View Code Duplication
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
312
        # Clean up
313
        worker.finished = True
314
        worker.sig_download_finished.emit(url, path)
315
        worker.sig_finished.emit(worker, path, None)
316
        self._get_requests.pop(url)
317
        self._workers.pop(url)
318
        self._paths.pop(url)
319
320
    @staticmethod
321
    def _progress(bytes_received, bytes_total, worker):
322
        """Return download progress."""
323
        worker.sig_download_progress.emit(
324
            worker.url, worker.path, bytes_received, bytes_total)
325
326
    def download(self, url, path):
327
        """Download url and save data to path."""
328
        # original_url = url
329
#        print(url)
330
        qurl = QUrl(url)
331
        url = to_text_string(qurl.toEncoded(), encoding='utf-8')
332
333
        logger.debug(str((url, path)))
334
        if url in self._workers:
335
            while not self._workers[url].finished:
336
                return self._workers[url]
337
338
        worker = DownloadWorker(url, path)
339
340
        # Check download folder exists
341
        folder = os.path.dirname(os.path.abspath(path))
342
        if not os.path.isdir(folder):
343
            os.makedirs(folder)
344
345
        request = QNetworkRequest(qurl)
346
        self._head_requests[url] = request
347
        self._paths[url] = path
348
        self._workers[url] = worker
349
        self._manager.head(request)
350
        self._timer.start()
351
352
        return worker
353
354
    def terminate(self):
355
        """Terminate all download workers and threads."""
356
        pass
357
358
359
class RequestsDownloadWorker(QObject):
360
    """Download Worker based on requests."""
361
362
    sig_finished = Signal(object, object, object)
363
    sig_download_finished = Signal(str, str)
364
    sig_download_progress = Signal(str, str, int, int)
365
366
    def __init__(self, method, args, kwargs):
367
        """Download Worker based on requests."""
368
        super(RequestsDownloadWorker, self).__init__()
369
        self.method = method
370
        self.args = args
371
        self.kwargs = kwargs
372
        self._is_finished = False
373
374
    def is_finished(self):
375
        """Return True if worker status is finished otherwise return False."""
376
        return self._is_finished
377
378
    def start(self):
379
        """Start process worker for given method args and kwargs."""
380
        error = None
381
        output = None
382
383
        try:
384
            output = self.method(*self.args, **self.kwargs)
385
        except Exception as err:
386
            error = err
387
            logger.debug(str((self.method.__name__,
388
                              self.method.__module__,
389
                              error)))
390
391
        self.sig_finished.emit(self, output, error)
392
        self._is_finished = True
393
394
395
class _RequestsDownloadAPI(QObject):
396
    """Download API based on requests."""
397
398
    _sig_download_finished = Signal(str, str)
399
    _sig_download_progress = Signal(str, str, int, int)
400
401
    def __init__(self, load_rc_func=None):
402
        """Download API based on requests."""
403
        super(QObject, self).__init__()
404
        self._conda_api = CondaAPI()
405
        self._queue = deque()
406
        self._threads = []
407
        self._workers = []
408
        self._timer = QTimer()
409
410
        self._load_rc_func = load_rc_func
411
        self._chunk_size = 1024
412
        self._timer.setInterval(1000)
413
        self._timer.timeout.connect(self._clean)
414
415
    @property
416
    def proxy_servers(self):
417
        """Return the proxy servers available from the conda rc config file."""
418
        if self._load_rc_func is None:
419
            return {}
420
        else:
421
            return self._load_rc_func().get('proxy_servers', {})
422
423
    def _clean(self):
424
        """Check for inactive workers and remove their references."""
425
        if self._workers:
426
            for w in self._workers:
427
                if w.is_finished():
428
                    self._workers.remove(w)
429
430
        if self._threads:
431
            for t in self._threads:
432
                if t.isFinished():
433
                    self._threads.remove(t)
434
        else:
435
            self._timer.stop()
436
437
    def _start(self):
438
        """Start the next threaded worker in the queue."""
439
        if len(self._queue) == 1:
440
            thread = self._queue.popleft()
441
            thread.start()
442
            self._timer.start()
443
444
    def _create_worker(self, method, *args, **kwargs):
445
        """Create a new worker instance."""
446
        thread = QThread()
447
        worker = RequestsDownloadWorker(method, args, kwargs)
448
        worker.moveToThread(thread)
449
        worker.sig_finished.connect(self._start)
450
        self._sig_download_finished.connect(worker.sig_download_finished)
451
        self._sig_download_progress.connect(worker.sig_download_progress)
452
        worker.sig_finished.connect(thread.quit)
453
        thread.started.connect(worker.start)
454
        self._queue.append(thread)
455
        self._threads.append(thread)
456
        self._workers.append(worker)
457
        self._start()
458
        return worker
459
460
    def _download(self, url, path=None, force=False):
461
        """Callback for download."""
462
        if path is None:
463
            path = url.split('/')[-1]
464
465
        # Make dir if non existent
466
        folder = os.path.dirname(os.path.abspath(path))
467
468
        if not os.path.isdir(folder):
469
            os.makedirs(folder)
470
471
        # Start actual download
472
        try:
473
            r = requests.get(url, stream=True, proxies=self.proxy_servers)
474
        except Exception as error:
475
            logger.error(str(error))
476
            # Break if error found!
477
#            self._sig_download_finished.emit(url, path)
478
#            return path
479
480
        total_size = int(r.headers.get('Content-Length', 0))
481
482
        # Check if file exists
483
        if os.path.isfile(path) and not force:
484
            file_size = os.path.getsize(path)
485
486
            # Check if existing file matches size of requested file
487
            if file_size == total_size:
488
                self._sig_download_finished.emit(url, path)
489
                return path
490
491
        # File not found or file size did not match. Download file.
492
        progress_size = 0
493
        with open(path, 'wb') as f:
494
            for chunk in r.iter_content(chunk_size=self._chunk_size):
495
                if chunk:
496
                    f.write(chunk)
497
                    progress_size += len(chunk)
498
                    self._sig_download_progress.emit(url, path,
499
                                                     progress_size,
500
                                                     total_size)
501
            self._sig_download_finished.emit(url, path)
502
503
        return path
504
505
    def _is_valid_url(self, url):
506
        """Callback for is_valid_url."""
507
        try:
508
            r = requests.head(url, proxies=self.proxy_servers)
509
            value = r.status_code in [200]
510
        except Exception as error:
511
            logger.error(str(error))
512
            value = False
513
514
        return value
515
516
    def _is_valid_channel(self, channel,
517
                          conda_url='https://conda.anaconda.org'):
518
        """Callback for is_valid_channel."""
519
        if channel.startswith('https://') or channel.startswith('http://'):
520
            url = channel
521
        else:
522
            url = "{0}/{1}".format(conda_url, channel)
523
524
        if url[-1] == '/':
525
            url = url[:-1]
526
527
        plat = self._conda_api.get_platform()
528
        repodata_url = "{0}/{1}/{2}".format(url, plat, 'repodata.json')
529
530
        try:
531
            r = requests.head(repodata_url, proxies=self.proxy_servers)
532
            value = r.status_code in [200]
533
        except Exception as error:
534
            logger.error(str(error))
535
            value = False
536
537
        return value
538
539
    def _is_valid_api_url(self, url):
540
        """Callback for is_valid_api_url."""
541
        # Check response is a JSON with ok: 1
542
        data = {}
543
        try:
544
            r = requests.get(url, proxies=self.proxy_servers)
545
            content = to_text_string(r.content, encoding='utf-8')
546
            data = json.loads(content)
547
        except Exception as error:
548
            logger.error(str(error))
549
550
        return data.get('ok', 0) == 1
551
552
    # --- Public API
553
    # -------------------------------------------------------------------------
554
    def download(self, url, path=None, force=False):
555
        """Download file given by url and save it to path."""
556
        logger.debug(str((url, path, force)))
557
        method = self._download
558
        return self._create_worker(method, url, path=path, force=force)
559
560
    def terminate(self):
561
        """Terminate all workers and threads."""
562
        for t in self._threads:
563
            t.quit()
564
        self._thread = []
565
        self._workers = []
566
567
    def is_valid_url(self, url, non_blocking=True):
568
        """Check if url is valid."""
569
        logger.debug(str((url)))
570
        if non_blocking:
571
            method = self._is_valid_url
572
            return self._create_worker(method, url)
573
        else:
574
            return self._is_valid_url(url)
575
576
    def is_valid_api_url(self, url, non_blocking=True):
577
        """Check if anaconda api url is valid."""
578
        logger.debug(str((url)))
579
        if non_blocking:
580
            method = self._is_valid_api_url
581
            return self._create_worker(method, url)
582
        else:
583
            return self._is_valid_api_url(url=url)
584
585
    def is_valid_channel(self,
586
                         channel,
587
                         conda_url='https://conda.anaconda.org',
588
                         non_blocking=True):
589
        """Check if a conda channel is valid."""
590
        logger.debug(str((channel, conda_url)))
591
        if non_blocking:
592
            method = self._is_valid_channel
593
            return self._create_worker(method, channel, conda_url)
594
        else:
595
            return self._is_valid_channel(channel, conda_url=conda_url)
596
597
    def get_api_info(self, url):
598
        """Query anaconda api info."""
599
        data = {}
600
        try:
601
            r = requests.get(url, proxies=self.proxy_servers)
602
            content = to_text_string(r.content, encoding='utf-8')
603
            data = json.loads(content)
604
            if not data:
605
                data['api_url'] = url
606
            if 'conda_url' not in data:
607
                data['conda_url'] = 'https://conda.anaconda.org'
608
        except Exception as error:
609
            logger.error(str(error))
610
611
        return data
612
613
614
DOWNLOAD_API = None
615
REQUESTS_DOWNLOAD_API = None
616
617
618
def DownloadAPI(load_rc_func=None):
619
    """Downlaod API based on Qt."""
620
    global DOWNLOAD_API
621
622
    if DOWNLOAD_API is None:
623
        DOWNLOAD_API = _DownloadAPI(load_rc_func=load_rc_func)
624
625
    return DOWNLOAD_API
626
627
628
def RequestsDownloadAPI(load_rc_func=None):
629
    """Download API threaded worker based on requests."""
630
    global REQUESTS_DOWNLOAD_API
631
632
    if REQUESTS_DOWNLOAD_API is None:
633
        REQUESTS_DOWNLOAD_API = _RequestsDownloadAPI(load_rc_func=load_rc_func)
634
635
    return REQUESTS_DOWNLOAD_API
636
637
638
# --- Local testing
639
# -----------------------------------------------------------------------------
640
def ready_print(worker, output, error):  # pragma: no cover
641
    """Print worker output for tests."""
642
    print(worker, output, error)
643
644
645
def test():  # pragma: no cover
646
    """Main local test."""
647
    from conda_manager.utils.qthelpers import qapplication
648
    urls = [
649
        'https://repo.continuum.io/pkgs/free/linux-64/repodata.json.bz2',
650
        'https://repo.continuum.io/pkgs/free/linux-64/repodata.json.bz2',
651
        'https://conda.anaconda.org/anaconda/linux-64/repodata.json.bz2',
652
        'https://conda.anaconda.org/asmeurer/linux-64/repodata.json.bz2',
653
        'https://conda.anaconda.org/conda-forge/linux-64/repodata.json.bz2',
654
            ]
655
    path = os.sep.join([os.path.expanduser('~'), 'testing-download'])
656
    app = qapplication()
657
    api = DownloadAPI()
658
659
    for i, url in enumerate(urls):
660
        filepath = os.path.join(path, str(i) + '.json.bz2')
661
        worker = api.download(url, filepath)
662
        worker.sig_finished.connect(ready_print)
663
        print('Downloading', url, filepath)
664
665
    path = os.sep.join([os.path.expanduser('~'), 'testing-download-requests'])
666
    api = RequestsDownloadAPI()
667
    urls += ['asdasdasdad']
668
    for i, url in enumerate(urls):
669
        worker = api.is_valid_url(url)
670
        worker.url = url
671
        worker.sig_finished.connect(ready_print)
672
        filepath = os.path.join(path, str(i) + '.json.bz2')
673
        worker = api.download(url, path=filepath, force=True)
674
        worker.sig_finished.connect(ready_print)
675
676
    api = RequestsDownloadAPI()
677
    print(api._is_valid_api_url('https://api.anaconda.org'))
678
    print(api._is_valid_api_url('https://conda.anaconda.org'))
679
    print(api._is_valid_channel('https://google.com'))
680
    print(api._is_valid_channel('https://conda.anaconda.org/continuumcrew'))
681
    print(api.get_api_info('https://api.anaconda.org'))
682
    sys.exit(app.exec_())
683
684
685
if __name__ == '__main__':  # pragma: no cover
686
    test()
687