Binstar   F
last analyzed

Complexity

Total Complexity 70

Size/Duplication

Total Lines 546
Duplicated Lines 0 %

Importance

Changes 10
Bugs 0 Features 1
Metric Value
c 10
b 0
f 1
dl 0
loc 546
rs 2.7272
wmc 70

30 Methods

Rating   Name   Duplication   Size   Complexity  
A session() 0 3 1
B _authenticate() 0 39 1
A krb_authenticate() 0 7 2
A list_scopes() 0 5 1
A authentication() 0 8 1
A authentication_type() 0 9 2
A authenticate() 0 2 1
A check_server() 0 16 3
A authentications() 0 9 1
B __init__() 0 23 4
A remove_authentication() 0 14 3
B user_packages() 0 43 6
B add_release() 0 27 2
B download() 0 36 5
A release() 0 12 1
A package_remove_collaborator() 0 5 1
F _check_response() 0 38 13
A distribution() 0 7 1
A user_licenses() 0 6 1
A package() 0 11 1
B add_package() 0 40 1
A package_add_collaborator() 0 5 1
A package_collaborators() 0 6 1
A search() 0 9 1
A remove_dist() 0 12 3
A remove_package() 0 7 1
A user() 0 16 2
A all_packages() 0 8 1
C upload() 0 66 7
A remove_release() 0 12 1

How to fix   Complexity   

Complex Class

Complex classes like Binstar often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from __future__ import absolute_import, print_function, unicode_literals
2
3
import collections
4
import os
5
import requests
6
import warnings
7
import logging
8
import platform
9
10
from six import raise_from
11
from six.moves.urllib.parse import quote
12
13
# For backwards compatibility
14
from .errors import *
15
from .requests_ext import stream_multipart, NullAuth
16
17
from .utils import compute_hash, jencode, pv
18
from .utils.http_codes import STATUS_CODES
19
20
from .mixins.organizations import OrgMixin
21
from .mixins.channels import ChannelsMixin
22
from .mixins.package import PackageMixin
23
24
from . import errors
25
26
logger = logging.getLogger('binstar')
27
28
from ._version import get_versions
29
30
__version__ = get_versions()['version']
31
del get_versions
32
33
34
class Binstar(OrgMixin, ChannelsMixin, PackageMixin):
35
    """
36
    An object that represents interfaces with the Anaconda repository restful API.
37
38
    :param token: a token generated by Binstar.authenticate or None for
39
                  an anonymous user.
40
    """
41
42
    def __init__(self, token=None, domain='https://api.anaconda.org', verify=True, **kwargs):
43
        self._session = requests.Session()
44
        self._session.headers['x-binstar-api-version'] = __version__
45
        self.session.verify = verify
46
        self.session.auth = NullAuth()
47
        self.token = token
48
        self._token_warning_sent = False
49
50
        user_agent = 'Anaconda-Client/{} (+https://anaconda.org)'.format(__version__)
51
        self._session.headers.update({
52
            'User-Agent': user_agent,
53
            'Content-Type':'application/json',
54
            'Accept': 'application/json',
55
        })
56
57
        if token:
58
            self._session.headers.update({'Authorization': 'token {}'.format(token)})
59
60
        if domain.endswith('/'):
61
            domain = domain[:-1]
62
        if not domain.startswith(('http://', 'https://')):
63
            domain = 'https://' + domain
64
        self.domain = domain
65
66
    @property
67
    def session(self):
68
        return self._session
69
70
    def check_server(self):
71
        """
72
        Checks if the server is reachable and throws
73
        and exception if it isn't
74
        """
75
        msg = 'API server not found. Please check your API url configuration.'
76
77
        try:
78
            response = self.session.head(self.domain)
79
        except Exception as e:
80
            raise_from(errors.ServerError(msg), e)
81
82
        try:
83
            self._check_response(response)
84
        except errors.NotFound as e:
85
            raise raise_from(errors.ServerError(msg), e)
86
87
    def authentication_type(self):
88
        url = '%s/authentication-type' % self.domain
89
        res = self.session.get(url)
90
        try:
91
            self._check_response(res)
92
            res = res.json()
93
            return res['authentication_type']
94
        except BinstarError:
95
            return 'password'
96
97
    def krb_authenticate(self, *args, **kwargs):
98
        try:
99
            from requests_kerberos import HTTPKerberosAuth
100
            return self._authenticate(HTTPKerberosAuth(), *args, **kwargs)
101
        except ImportError:
102
            raise BinstarError(
103
                'Kerberos authentication requires the requests-kerberos '
104
                'package to be installed:\n'
105
                '    conda install requests-kerberos\n'
106
                'or: \n'
107
                '    pip install requests-kerberos'
108
            )
109
110
    def authenticate(self, username, password, *args, **kwargs):
111
        return self._authenticate((username, password), *args, **kwargs)
112
113
    def _authenticate(self,
114
                     auth,
115
                     application,
116
                     application_url=None,
117
                     for_user=None,
118
                     scopes=None,
119
                     created_with=None,
120
                     max_age=None,
121
                     strength='strong',
122
                     fail_if_already_exists=False,
123
                     hostname=platform.node()):
124
        '''
125
        Use basic authentication to create an authentication token using the interface below.
126
        With this technique, a username and password need not be stored permanently, and the user can
127
        revoke access at any time.
128
129
        :param username: The users name
130
        :param password: The users password
131
        :param application: The application that is requesting access
132
        :param application_url: The application's home page
133
        :param scopes: Scopes let you specify exactly what type of access you need. Scopes limit access for the tokens.
134
        '''
135
136
        url = '%s/authentications' % (self.domain)
137
        payload = {"scopes": scopes, "note": application, "note_url": application_url,
138
                   'hostname': hostname,
139
                   'user': for_user,
140
                   'max-age': max_age,
141
                   'created_with': None,
142
                   'strength': strength,
143
                   'fail-if-exists': fail_if_already_exists}
144
145
        data, headers = jencode(payload)
146
        res = self.session.post(url, auth=auth, data=data, headers=headers)
147
        self._check_response(res)
148
        res = res.json()
149
        token = res['token']
150
        self.session.headers.update({'Authorization': 'token %s' % (token)})
151
        return token
152
153
    def list_scopes(self):
154
        url = '%s/scopes' % (self.domain)
155
        res = requests.get(url)
156
        self._check_response(res)
157
        return res.json()
158
159
    def authentication(self):
160
        '''
161
        Retrieve information on the current authentication token
162
        '''
163
        url = '%s/authentication' % (self.domain)
164
        res = self.session.get(url)
165
        self._check_response(res)
166
        return res.json()
167
168
    def authentications(self):
169
        '''
170
        Get a list of the current authentication tokens
171
        '''
172
173
        url = '%s/authentications' % (self.domain)
174
        res = self.session.get(url)
175
        self._check_response(res)
176
        return res.json()
177
178
    def remove_authentication(self, auth_name=None, organization=None):
179
        """
180
        Remove the current authentication or the one given by `auth_name`
181
        """
182
        if auth_name:
183
            if organization:
184
                url = '%s/authentications/org/%s/name/%s' % (self.domain, organization, auth_name)
185
            else:
186
                url = '%s/authentications/name/%s' % (self.domain, auth_name)
187
        else:
188
            url = '%s/authentications' % (self.domain,)
189
190
        res = self.session.delete(url)
191
        self._check_response(res, [201])
192
193
    def _check_response(self, res, allowed=[200]):
194
        api_version = res.headers.get('x-binstar-api-version', '0.2.1')
195
        if pv(api_version) > pv(__version__):
196
            logger.warning('The api server is running the binstar-api version %s. you are using %s\nPlease update your '
197
                           'client with pip install -U binstar or conda update binstar' % (api_version, __version__))
198
199
        if not self._token_warning_sent and 'Conda-Token-Warning' in res.headers:
200
            logger.warning('Token warning: {}'.format(res.headers['Conda-Token-Warning']))
201
            self._token_warning_sent = True
202
203
        if 'X-Anaconda-Lockdown' in res.headers:
204
            logger.warning('Anaconda repository is currently in LOCKDOWN mode.')
205
206
        if 'X-Anaconda-Read-Only' in res.headers:
207
            logger.warning('Anaconda repository is currently in READ ONLY mode.')
208
209
        if not res.status_code in allowed:
210
            short, long = STATUS_CODES.get(res.status_code, ('?', 'Undefined error'))
211
            msg = '%s: %s ([%s] %s -> %s)' % (short, long, res.request.method, res.request.url, res.status_code)
212
213
            try:
214
                data = res.json()
215
            except:
216
                pass
217
            else:
218
                msg = data.get('error', msg)
219
220
            ErrCls = errors.BinstarError
221
            if res.status_code == 401:
222
                ErrCls = errors.Unauthorized
223
            elif res.status_code == 404:
224
                ErrCls = errors.NotFound
225
            elif res.status_code == 409:
226
                ErrCls = errors.Conflict
227
            elif res.status_code >= 500:
228
                ErrCls = errors.ServerError
229
230
            raise ErrCls(msg, res.status_code)
231
232
    def user(self, login=None):
233
        '''
234
        Get user information.
235
236
        :param login: (optional) the login name of the user or None. If login is None
237
                      this method will return the information of the authenticated user.
238
        '''
239
        if login:
240
            url = '%s/user/%s' % (self.domain, login)
241
        else:
242
            url = '%s/user' % (self.domain)
243
244
        res = self.session.get(url, verify=self.session.verify)
245
        self._check_response(res)
246
247
        return res.json()
248
249
    def user_packages(
250
            self,
251
            login=None,
252
            platform=None,
253
            package_type=None,
254
            type_=None,
255
            access=None):
256
        '''
257
        Returns a list of packages for a given user and optionally filter
258
        by `platform`, `package_type` and `type_`.
259
260
        :param login: (optional) the login name of the user or None. If login
261
                      is None this method will return the packages for the
262
                      authenticated user.
263
        :param platform: only find packages that include files for this platform.
264
           (e.g. 'linux-64', 'osx-64', 'win-32')
265
        :param package_type: only find packages that have this kind of file
266
           (e.g. 'env', 'conda', 'pypi')
267
        :param type_: only find packages that have this conda `type`
268
           (i.e. 'app')
269
        :param access: only find packages that have this access level
270
           (e.g. 'private', 'authenticated', 'public')
271
        '''
272
        if login:
273
            url = '{0}/packages/{1}'.format(self.domain, login)
274
        else:
275
            url = '{0}/packages'.format(self.domain)
276
277
        arguments = collections.OrderedDict()
278
279
        if platform:
280
            arguments['platform'] = platform
281
        if package_type:
282
            arguments['package_type'] = package_type
283
        if type_:
284
            arguments['type'] = type_
285
        if access:
286
            arguments['access'] = access
287
288
        res = self.session.get(url, params=arguments)
289
        self._check_response(res)
290
291
        return res.json()
292
293
    def package(self, login, package_name):
294
        '''
295
        Get information about a specific package
296
297
        :param login: the login of the package owner
298
        :param package_name: the name of the package
299
        '''
300
        url = '%s/package/%s/%s' % (self.domain, login, package_name)
301
        res = self.session.get(url)
302
        self._check_response(res)
303
        return res.json()
304
305
    def package_add_collaborator(self, owner, package_name, collaborator):
306
        url = '%s/packages/%s/%s/collaborators/%s' % (self.domain, owner, package_name, collaborator)
307
        res = self.session.put(url)
308
        self._check_response(res, [201])
309
        return
310
311
    def package_remove_collaborator(self, owner, package_name, collaborator):
312
        url = '%s/packages/%s/%s/collaborators/%s' % (self.domain, owner, package_name, collaborator)
313
        res = self.session.delete(url)
314
        self._check_response(res, [201])
315
        return
316
317
    def package_collaborators(self, owner, package_name):
318
319
        url = '%s/packages/%s/%s/collaborators' % (self.domain, owner, package_name)
320
        res = self.session.get(url)
321
        self._check_response(res, [200])
322
        return res.json()
323
324
    def all_packages(self, modified_after=None):
325
        '''
326
        '''
327
        url = '%s/package_listing' % (self.domain)
328
        data = {'modified_after':modified_after or ''}
329
        res = self.session.get(url, data=data)
330
        self._check_response(res)
331
        return res.json()
332
333
334
    def add_package(self, login, package_name,
335
                    summary=None,
336
                    license=None,
337
                    public=True,
338
                    license_url=None,
339
                    license_family=None,
340
                    attrs=None,
341
                    package_type=None):
342
        '''
343
        Add a new package to a users account
344
345
        :param login: the login of the package owner
346
        :param package_name: the name of the package to be created
347
        :param package_type: A type identifier for the package (eg. 'pypi' or 'conda', etc.)
348
        :param summary: A short summary about the package
349
        :param license: the name of the package license
350
        :param license_url: the url of the package license
351
        :param public: if true then the package will be hosted publicly
352
        :param attrs: A dictionary of extra attributes for this package
353
        '''
354
        url = '%s/package/%s/%s' % (self.domain, login, package_name)
355
356
        attrs = attrs or {}
357
        attrs['summary'] = summary
358
        attrs['package_types'] = [package_type]
359
        attrs['license'] = {
360
            'name': license,
361
            'url': license_url,
362
            'family': license_family,
363
        }
364
365
        payload = dict(public=bool(public),
366
                       publish=False,
367
                       public_attrs=dict(attrs or {})
368
                       )
369
370
        data, headers = jencode(payload)
371
        res = self.session.post(url, data=data, headers=headers)
372
        self._check_response(res)
373
        return res.json()
374
375
    def remove_package(self, username, package_name):
376
377
        url = '%s/package/%s/%s' % (self.domain, username, package_name)
378
379
        res = self.session.delete(url)
380
        self._check_response(res, [201])
381
        return
382
383
    def release(self, login, package_name, version):
384
        '''
385
        Get information about a specific release
386
387
        :param login: the login of the package owner
388
        :param package_name: the name of the package
389
        :param version: the name of the package
390
        '''
391
        url = '%s/release/%s/%s/%s' % (self.domain, login, package_name, version)
392
        res = self.session.get(url)
393
        self._check_response(res)
394
        return res.json()
395
396
    def remove_release(self, username, package_name, version):
397
        '''
398
        remove a release and all files under it
399
400
        :param username: the login of the package owner
401
        :param package_name: the name of the package
402
        :param version: the name of the package
403
        '''
404
        url = '%s/release/%s/%s/%s' % (self.domain, username, package_name, version)
405
        res = self.session.delete(url)
406
        self._check_response(res, [201])
407
        return
408
409
    def add_release(self, login, package_name, version, requirements, announce, release_attrs):
410
        '''
411
        Add a new release to a package.
412
413
        :param login: the login of the package owner
414
        :param package_name: the name of the package
415
        :param version: the version string of the release
416
        :param requirements: A dict of requirements TODO: describe
417
        :param announce: An announcement that will be posted to all package watchers
418
        '''
419
420
        url = '%s/release/%s/%s/%s' % (self.domain, login, package_name, version)
421
422
        if not release_attrs:
423
            release_attrs = {}
424
425
        payload = {
426
            'requirements': requirements,
427
            'announce': announce,
428
            'description': None,  # Will be updated with the one on release_attrs
429
        }
430
        payload.update(release_attrs)
431
432
        data, headers = jencode(payload)
433
        res = self.session.post(url, data=data, headers=headers)
434
        self._check_response(res)
435
        return res.json()
436
437
    def distribution(self, login, package_name, release, basename=None):
438
439
        url = '%s/dist/%s/%s/%s/%s' % (self.domain, login, package_name, release, basename)
440
441
        res = self.session.get(url)
442
        self._check_response(res)
443
        return res.json()
444
445
    def remove_dist(self, login, package_name, release, basename=None, _id=None):
446
447
        if basename:
448
            url = '%s/dist/%s/%s/%s/%s' % (self.domain, login, package_name, release, basename)
449
        elif _id:
450
            url = '%s/dist/%s/%s/%s/-/%s' % (self.domain, login, package_name, release, _id)
451
        else:
452
            raise TypeError("method remove_dist expects either 'basename' or '_id' arguments")
453
454
        res = self.session.delete(url)
455
        self._check_response(res)
456
        return res.json()
457
458
459
    def download(self, login, package_name, release, basename, md5=None):
460
        '''
461
        Download a package distribution
462
463
        :param login: the login of the package owner
464
        :param package_name: the name of the package
465
        :param version: the version string of the release
466
        :param basename: the basename of the distribution to download
467
        :param md5: (optional) an md5 hash of the download if given and the package has not changed
468
                    None will be returned
469
470
        :returns: a file like object or None
471
        '''
472
473
        url = '%s/download/%s/%s/%s/%s' % (self.domain, login, package_name, release, basename)
474
        if md5:
475
            headers = {'ETag':md5, }
476
        else:
477
            headers = {}
478
479
        res = self.session.get(url, headers=headers, allow_redirects=False)
480
        self._check_response(res, allowed=[200, 302, 304])
481
482
        if res.status_code == 200:
483
            # We received the content directly from anaconda.org
484
            return res
485
        elif res.status_code == 304:
486
            # The content has not changed
487
            return None
488
        elif res.status_code == 302:
489
            # Download from s3:
490
            # We need to create a new request (without using session) to avoid
491
            # sending the custom headers set on our session to S3 (which causes
492
            # a failure).
493
            res2 = requests.get(res.headers['location'], stream=True)
494
            return res2
495
496
497
    def upload(self, login, package_name, release, basename, fd, distribution_type,
498
               description='', md5=None, size=None, dependencies=None, attrs=None, channels=('main',), callback=None):
499
        '''
500
        Upload a new distribution to a package release.
501
502
        :param login: the login of the package owner
503
        :param package_name: the name of the package
504
        :param version: the version string of the release
505
        :param basename: the basename of the distribution to download
506
        :param fd: a file like object to upload
507
        :param distribution_type: pypi or conda or ipynb, etc
508
        :param description: (optional) a short description about the file
509
        :param attrs: any extra attributes about the file (eg. build=1, pyversion='2.7', os='osx')
510
511
        '''
512
        url = '%s/stage/%s/%s/%s/%s' % (self.domain, login, package_name, release, quote(basename))
513
        if attrs is None:
514
            attrs = {}
515
        if not isinstance(attrs, dict):
516
            raise TypeError('argument attrs must be a dictionary')
517
518
        payload = dict(distribution_type=distribution_type, description=description, attrs=attrs,
519
                       dependencies=dependencies, channels=channels)
520
521
        data, headers = jencode(payload)
522
        res = self.session.post(url, data=data, headers=headers)
523
        self._check_response(res)
524
        obj = res.json()
525
526
        s3url = obj['post_url']
527
        s3data = obj['form_data']
528
529
        if md5 is None:
530
            _hexmd5, b64md5, size = compute_hash(fd, size=size)
531
        elif size is None:
532
            spos = fd.tell()
533
            fd.seek(0, os.SEEK_END)
534
            size = fd.tell() - spos
535
            fd.seek(spos)
536
537
        s3data['Content-Length'] = size
538
        s3data['Content-MD5'] = b64md5
539
540
        data_stream, headers = stream_multipart(s3data, files={'file':(basename, fd)},
541
                                                callback=callback)
542
543
        request_method = self.session if s3url.startswith(self.domain) else requests
544
        s3res = request_method.post(
545
            s3url, data=data_stream,
546
            verify=self.session.verify, timeout=10 * 60 * 60,
547
            headers=headers
548
        )
549
550
        if s3res.status_code != 201:
551
            logger.info(s3res.text)
552
            logger.info('')
553
            logger.info('')
554
            raise errors.BinstarError('Error uploading package', s3res.status_code)
555
556
        url = '%s/commit/%s/%s/%s/%s' % (self.domain, login, package_name, release, quote(basename))
557
        payload = dict(dist_id=obj['dist_id'])
558
        data, headers = jencode(payload)
559
        res = self.session.post(url, data=data, headers=headers)
560
        self._check_response(res)
561
562
        return res.json()
563
564
    def search(self, query, package_type=None, platform=None):
565
        url = '%s/search' % self.domain
566
        res = self.session.get(url, params={
567
            'name': query,
568
            'type': package_type,
569
            'platform': platform,
570
        })
571
        self._check_response(res)
572
        return res.json()
573
574
    def user_licenses(self):
575
        """Download the user current trial/paid licenses."""
576
        url = '{domain}/license'.format(domain=self.domain)
577
        res = self.session.get(url)
578
        self._check_response(res)
579
        return res.json()
580
581
582
from ._version import get_versions
583
__version__ = get_versions()['version']
584
del get_versions
585