GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( cfccd9...b77fc8 )
by Gonzalo
27s
created

  F

Complexity

Total Complexity 74

Size/Duplication

Total Lines 406
Duplicated Lines 6.9 %

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 28
loc 406
rs 2.3809
wmc 74

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complex Class

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
# -----------------------------------------------------------------------------
3
# Copyright © 2015- The Spyder Development Team
4
# Copyright © 2014-2015 Gonzalo Peña-Castellanos (@goanpeca)
5
#
6
# Licensed under the terms of the MIT License
7
# -----------------------------------------------------------------------------
8
"""Worker threads for using the anaconda-client api."""
9
10
# Standard library imports
11
from collections import deque
12
import bz2
13
import json
14
import logging
15
import os
16
import time
17
18
# Third party imports
19
from binstar_client.utils import get_config, set_config
20
from qtpy.QtCore import QObject, QThread, QTimer, Signal
21
import binstar_client
22
23
# Local imports
24
from conda_manager.api.conda_api import CondaAPI
25
from conda_manager.utils import constants as C
26
from conda_manager.utils import sort_versions
27
from conda_manager.utils.logs import logger
28 View Code Duplication
from conda_manager.utils.py3compat import to_text_string
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
29
30
31
class ClientWorker(QObject):
32
    """Anaconda Client API process worker."""
33
34
    sig_finished = Signal(object, object, object)
35
36
    def __init__(self, method, args, kwargs):
37
        """Anaconda Client API process worker."""
38
        super(ClientWorker, self).__init__()
39
        self.method = method
40
        self.args = args
41
        self.kwargs = kwargs
42
        self._is_finished = False
43
44
    def is_finished(self):
45
        """Return wether or not the worker has finished running the task."""
46
        return self._is_finished
47
48
    def start(self):
49
        """Start the worker process."""
50
        error, output = None, None
51
        try:
52
            time.sleep(0.1)
53
            output = self.method(*self.args, **self.kwargs)
54
        except Exception as err:
55
            logger.debug(str((self.method.__module__, self.method.__name__,
56
                              err)))
57
            error = str(err)
58
            error = error.replace('(', '')
59
            error = error.replace(')', '')
60
#            try:
61
#                error = err[0]
62
#            except Exception:
63
#                try:
64
#                    error = err.message
65
#                except Exception as err2:
66
#                    error = ''
67
68
        self.sig_finished.emit(self, output, str(error))
69
        self._is_finished = True
70
71
72
class _ClientAPI(QObject):
73
    """Anaconda Client API wrapper."""
74
75 View Code Duplication
    def __init__(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
76
        """Anaconda Client API wrapper."""
77
        super(QObject, self).__init__()
78
        self._anaconda_client_api = binstar_client.utils.get_server_api(
79
            log_level=logging.NOTSET)
80
        self._queue = deque()
81
        self._threads = []
82
        self._workers = []
83
        self._timer = QTimer()
84
        self._conda_api = CondaAPI()
85
86
        self._timer.setInterval(1000)
87
        self._timer.timeout.connect(self._clean)
88
89
    def _clean(self):
90
        """Check for inactive workers and remove their references."""
91
        if self._workers:
92
            for w in self._workers:
93
                if w.is_finished():
94
                    self._workers.remove(w)
95
96
        if self._threads:
97
            for t in self._threads:
98
                if t.isFinished():
99
                    self._threads.remove(t)
100
        else:
101
            self._timer.stop()
102
103
    def _start(self):
104
        """Take avalaible worker from the queue and start it."""
105
        if len(self._queue) == 1:
106
            thread = self._queue.popleft()
107
            thread.start()
108
            self._timer.start()
109
110
    def _create_worker(self, method, *args, **kwargs):
111
        """Create a worker for this client to be run in a separate thread."""
112 View Code Duplication
        # FIXME: this might be heavy...
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
113
        thread = QThread()
114
        worker = ClientWorker(method, args, kwargs)
115
        worker.moveToThread(thread)
116
        worker.sig_finished.connect(self._start)
117
        worker.sig_finished.connect(thread.quit)
118
        thread.started.connect(worker.start)
119
        self._queue.append(thread)
120
        self._threads.append(thread)
121
        self._workers.append(worker)
122
        self._start()
123
        return worker
124
125
    @staticmethod
126
    def _load_repodata(filepaths, extra_data=None, metadata=None):
127
        """Load all the available pacakges information.
128
129
        For downloaded repodata files (repo.continuum.io), additional
130
        data provided (anaconda cloud), and additional metadata and merge into
131
        a single set of packages and apps.
132
        """
133
        extra_data = extra_data if extra_data else {}
134
        metadata = metadata if metadata else {}
135
        repodata = []
136
        for filepath in filepaths:
137
            compressed = filepath.endswith('.bz2')
138
            mode = 'rb' if filepath.endswith('.bz2') else 'r'
139
140
            if os.path.isfile(filepath):
141
                with open(filepath, mode) as f:
142
                    raw_data = f.read()
143
144
                if compressed:
145
                    data = bz2.decompress(raw_data)
146
                else:
147
                    data = raw_data
148
149
                try:
150
                    data = json.loads(to_text_string(data, 'UTF-8'))
151
                except Exception as error:
152
                    logger.error(str(error))
153
                    data = {}
154
155
                repodata.append(data)
156
157
        all_packages = {}
158
        for data in repodata:
159
            packages = data.get('packages', {})
160
            for canonical_name in packages:
161
                data = packages[canonical_name]
162
                name, version, b = tuple(canonical_name.rsplit('-', 2))
163
164
                if name not in all_packages:
165
                    all_packages[name] = {'versions': set(),
166
                                          'size': {},
167
                                          'type': {},
168
                                          'app_entry': {},
169
                                          'app_type': {},
170
                                          }
171
                elif name in metadata:
172
                    temp_data = all_packages[name]
173
                    temp_data['home'] = metadata[name].get('home', '')
174
                    temp_data['license'] = metadata[name].get('license', '')
175
                    temp_data['summary'] = metadata[name].get('summary', '')
176
                    temp_data['latest_version'] = metadata[name].get('version')
177
                    all_packages[name] = temp_data
178
179
                all_packages[name]['versions'].add(version)
180
                all_packages[name]['size'][version] = data.get('size', '')
181
182
                # Only the latest builds will have the correct metadata for
183
                # apps, so only store apps that have the app metadata
184
                if data.get('type'):
185
                    all_packages[name]['type'][version] = data.get('type')
186
                    all_packages[name]['app_entry'][version] = data.get(
187
                        'app_entry')
188
                    all_packages[name]['app_type'][version] = data.get(
189
                        'app_type')
190
191
        all_apps = {}
192
        for name in all_packages:
193
            versions = sort_versions(list(all_packages[name]['versions']))
194
            all_packages[name]['versions'] = versions[:]
195
196
            for version in versions:
197
                has_type = all_packages[name].get('type')
198
                # Has type in this case implies being an app
199
                if has_type:
200
                    all_apps[name] = all_packages[name].copy()
201
                    # Remove all versions that are not apps!
202
                    versions = all_apps[name]['versions'][:]
203
                    types = all_apps[name]['type']
204
                    app_versions = [v for v in versions if v in types]
205
                    all_apps[name]['versions'] = app_versions
206
207
        return all_packages, all_apps
208
209
    @staticmethod
210
    def _prepare_model_data(packages, linked, pip=None,
211
                            private_packages=None):
212
        """Prepare model data for the packages table model."""
213
        pip = pip if pip else []
214
        private_packages = private_packages if private_packages else {}
215
216
        data = []
217
218
        if private_packages is not None:
219
            for pkg in private_packages:
220
                if pkg in packages:
221
                    p_data = packages.get(pkg)
222
                    versions = p_data.get('versions', '') if p_data else []
223
                    private_versions = private_packages[pkg]['versions']
224
                    all_versions = sort_versions(list(set(versions +
225
                                                          private_versions)))
226
                    packages[pkg]['versions'] = all_versions
227
                else:
228
                    private_versions = sort_versions(
229
                        private_packages[pkg]['versions'])
230
                    private_packages[pkg]['versions'] = private_versions
231
                    packages[pkg] = private_packages[pkg]
232
        else:
233
            private_packages = {}
234
235
        linked_packages = {}
236
        for canonical_name in linked:
237
            name, version, b = tuple(canonical_name.rsplit('-', 2))
238
            linked_packages[name] = {'version': version}
239
240
        pip_packages = {}
241
        for canonical_name in pip:
242
            name, version, b = tuple(canonical_name.rsplit('-', 2))
243
            pip_packages[name] = {'version': version}
244
245
        packages_names = sorted(list(set(list(linked_packages.keys()) +
246
                                         list(pip_packages.keys()) +
247
                                         list(packages.keys()) +
248
                                         list(private_packages.keys())
249
                                         )
250
                                     )
251
                                )
252
253
        for name in packages_names:
254
            p_data = packages.get(name)
255
256
            summary = p_data.get('summary', '') if p_data else ''
257
            url = p_data.get('home', '') if p_data else ''
258
            license_ = p_data.get('license', '') if p_data else ''
259
            versions = p_data.get('versions', '') if p_data else []
260
            version = p_data.get('latest_version', '') if p_data else ''
261
262
            if name in pip_packages:
263
                type_ = C.PIP_PACKAGE
264
                version = pip_packages[name].get('version', '')
265
                status = C.INSTALLED
266
            elif name in linked_packages:
267
                type_ = C.CONDA_PACKAGE
268
                version = linked_packages[name].get('version', '')
269
                status = C.INSTALLED
270
271
                if version in versions:
272
                    vers = versions
273
                    upgradable = not version == vers[-1] and len(vers) != 1
274
                    downgradable = not version == vers[0] and len(vers) != 1
275
276
                    if upgradable and downgradable:
277
                        status = C.MIXGRADABLE
278
                    elif upgradable:
279
                        status = C.UPGRADABLE
280
                    elif downgradable:
281
                        status = C.DOWNGRADABLE
282
            else:
283
                type_ = C.CONDA_PACKAGE
284
                status = C.NOT_INSTALLED
285
286
                if version == '' and len(versions) != 0:
287
                    version = versions[-1]
288
289
            row = {C.COL_ACTION: C.ACTION_NONE,
290
                   C.COL_PACKAGE_TYPE: type_,
291
                   C.COL_NAME: name,
292
                   C.COL_DESCRIPTION: summary.capitalize(),
293
                   C.COL_VERSION: version,
294
                   C.COL_STATUS: status,
295
                   C.COL_URL: url,
296
                   C.COL_LICENSE: license_,
297
                   C.COL_INSTALL: False,
298
                   C.COL_REMOVE: False,
299
                   C.COL_UPGRADE: False,
300
                   C.COL_DOWNGRADE: False,
301
                   C.COL_ACTION_VERSION: None
302
                   }
303
304
            data.append(row)
305
        return data
306
307
    # --- Public API
308
    # -------------------------------------------------------------------------
309
    def login(self, username, password, application, application_url):
310
        """Login to anaconda cloud."""
311
        logger.debug(str((username, application, application_url)))
312
        method = self._anaconda_client_api.authenticate
313
        return self._create_worker(method, username, password, application,
314
                                   application_url)
315
316
    def logout(self):
317
        """Logout from anaconda cloud."""
318
        logger.debug('Logout')
319
        method = self._anaconda_client_api.remove_authentication
320
        return self._create_worker(method)
321
322
    def load_repodata(self, filepaths, extra_data=None, metadata=None):
323
        """
324
        Load all the available pacakges information for downloaded repodata.
325
326
        Files include repo.continuum.io, additional data provided (anaconda
327
        cloud), and additional metadata and merge into a single set of packages
328
        and apps.
329
        """
330
        logger.debug(str((filepaths)))
331
        method = self._load_repodata
332
        return self._create_worker(method, filepaths, extra_data=extra_data,
333
                                   metadata=metadata)
334
335
    def prepare_model_data(self, packages, linked, pip=None,
336
                           private_packages=None):
337
        """Prepare downloaded package info along with pip pacakges info."""
338
        logger.debug('')
339
        return self._prepare_model_data(packages, linked, pip=pip,
340
                                        private_packages=private_packages)
341
342
    def set_domain(self, domain='https://api.anaconda.org'):
343
        """Reset current api domain."""
344
        logger.debug(str((domain)))
345
        config = binstar_client.utils.get_config()
346
        config['url'] = domain
347
        binstar_client.utils.set_config(config)
348
349
        self._anaconda_client_api = binstar_client.utils.get_server_api(
350
            token=None, log_level=logging.NOTSET)
351
352
        return self.user()
353
354
    @staticmethod
355
    def store_token(token):
356
        """Store authentication user token."""
357
        class Args:
358
            """Enum."""
359
360
            site = None
361
362
        binstar_client.utils.store_token(token, Args)
363
364
    @staticmethod
365
    def remove_token():
366
        """Remove authentication user token."""
367
        class Args:
368
            """Enum."""
369
370
            site = None
371
372
        binstar_client.utils.remove_token(Args)
373
374
    def user(self):
375
        """Return current logged user information."""
376
        try:
377
            user = self._anaconda_client_api.user()
378
        except Exception:
379
            user = {}
380
        return user
381
382
    def domain(self):
383
        """Return current domain."""
384
        return self._anaconda_client_api.domain
385
386
    def packages(self, login=None, platform=None, package_type=None,
387
                 type_=None, access=None):
388
        """Return all the available packages for a given user.
389
390
        Parameters
391
        ----------
392
        type_: Optional[str]
393
            Only find packages that have this conda `type`, (i.e. 'app').
394
        access : Optional[str]
395
            Only find packages that have this access level (e.g. 'private',
396
            'authenticated', 'public').
397
        """
398
        logger.debug('')
399
        method = self._anaconda_client_api.user_packages
400
        return self._create_worker(method, login=login, platform=platform,
401
                                   package_type=package_type,
402
                                   type_=type_, access=access)
403
404
    def _multi_packages(self, logins=None, platform=None, package_type=None,
405
                        type_=None, access=None, new_client=True):
406
        """Return the private packages for a given set of usernames/logins."""
407
        private_packages = {}
408
409
        if not new_client:
410
            time.sleep(0.3)
411
            return private_packages
412
413
        for login in logins:
414
            data = self._anaconda_client_api.user_packages(
415
                login=login,
416
                platform=platform,
417
                package_type=package_type,
418
                type_=type_,
419
                access=access)
420
            for item in data:
421
                name = item.get('name', '')
422
                public = item.get('public', True)
423
                package_types = item.get('package_types', [])
424
                latest_version = item.get('latest_version', '')
425
                if name and not public and 'conda' in package_types:
426
                    if name in private_packages:
427
                        versions = private_packages.get('versions', []),
428
                        new_versions = item.get('versions', []),
429
                        vers = sort_versions(list(set(versions +
430
                                                      new_versions)))
431
                        private_packages[name]['versions'] = vers
432
                        private_packages[name]['latest_version'] = vers[-1]
433
                    else:
434
                        private_packages[name] = {
435
                            'versions': item.get('versions', []),
436
                            'app_entry': {},
437
                            'type': {},
438
                            'size': {},
439
                            'latest_version': latest_version, }
440
441
        return private_packages
442
443
    def multi_packages(self, logins=None, platform=None, package_type=None,
444
                       type_=None, access=None):
445
        """Return the private packages for a given set of usernames/logins."""
446
        logger.debug('')
447
        method = self._multi_packages
448
        new_client = True
449
450
        try:
451
            # Only the newer versions have extra keywords like `access`
452
            self._anaconda_client_api.user_packages(access='private')
453
        except Exception:
454
            new_client = False
455
456
        return self._create_worker(method, logins=logins,
457
                                   platform=platform,
458
                                   package_type=package_type,
459
                                   type_=type_, access=access,
460
                                   new_client=new_client)
461
462
    def organizations(self, login=None):
463
        """List all the organizations a user has access to."""
464
        return self._anaconda_client_api.user(login=login)
465
466
    @staticmethod
467
    def load_token(url):
468
        """Load saved token for a given url api site."""
469
        token = binstar_client.utils.load_token(url)
470
        return token
471
472
    @staticmethod
473
    def get_api_url():
474
        """Get the anaconda client url configuration."""
475
        return get_config().get('url', 'https://api.anaconda.org')
476
477
    @staticmethod
478
    def set_api_url(url):
479
        """Set the anaconda client url configuration."""
480
        data = get_config()
481
        data['url'] = url
482
        set_config(data)
483
484
485
CLIENT_API = None
486
487
488
def ClientAPI():
489
    """Client API threaded worker."""
490
    global CLIENT_API
491
492
    if CLIENT_API is None:
493
        CLIENT_API = _ClientAPI()
494
495
    return CLIENT_API
496
497
498
def print_output(worker, output, error):  # pragma: no cover
499
    """Test helper print function."""
500
    print(output, error)
501
502
503
def test():  # pragma: no cover
504
    """Local main test."""
505
    from conda_manager.utils.qthelpers import qapplication
506
507
    app = qapplication()
508
    api = ClientAPI()
509
    api.login('goanpeca', 'asdasd', 'baby', '')
510
    api.login('bruce', 'asdasd', 'baby', '')
511
    api.set_domain(domain='https://api.beta.anaconda.org')
512
    worker = api.multi_packages(logins=['goanpeca'])
513
    worker.sig_finished.connect(print_output)
514
    worker = api.organizations(login='goanpeca')
515
    print(api.get_api_url())
516
    app.exec_()
517
518
519
if __name__ == '__main__':  # pragma: no cover
520
    test()
521