AccountAPI.get_quotas()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 5
rs 9.4285
1
# -*- coding: utf-8 -*-
2
#
3
# Copyright (c) 2013-2016 Online SAS and Contributors. All Rights Reserved.
4
#                         Julien Castets <[email protected]>
5
#                         Romain Gay <[email protected]>
6
#                         Kevin Deldycke <[email protected]>
7
#
8
# Licensed under the BSD 2-Clause License (the "License"); you may not use this
9
# file except in compliance with the License. You may obtain a copy of the
10
# License at https://opensource.org/licenses/BSD-2-Clause
11
12
import slumber
13
from six.moves import zip_longest
14
15
from . import API
16
17
18
class InvalidToken(Exception):
19
    pass
20
21
22
class ExpiredToken(InvalidToken):
23
    pass
24
25
26
class BadToken(InvalidToken):
27
    pass
28
29
30
class AccountAPI(API):
31
    """ Interacts with Scaleway Account API.
32
    """
33
    base_url = 'https://account.scaleway.com/'
34
35
    def perm_matches(self, request_perm, effective_perm):
36
        """ Evaluates whether `request_perm` is granted by `effective_perm`.
37
38
        Permissions are string separated by semi-colon characters.
39
        Checking of permissions is performed from left to right and stops at
40
        the first mismatch between `effective_perm` and `request_perm`.
41
42
        The `*` character is used to match all permissions at a given step in
43
        the permission validation process.
44
45
        Examples:
46
        >>> perm_matches('request:auth:read', 'request:auth:*')
47
        True
48
        >>> perm_matches('request:auth:read', 'request:*')
49
        True
50
        >>> perm_matches('request:auth:read', 'request:log:*')
51
        False
52
        >>> perm_matches('request:log:write', 'request:log:read')
53
        False
54
55
        :param request_perm: Currently granted permissions
56
        :param effective_perm: Actual permission granted to the token
57
        """
58
        if request_perm is None:
59
            return True
60
61
        request_perm_parts = request_perm.split(':')
62
        effective_perm_parts = effective_perm.split(':')
63
64
        for (request_perm_part,
65
             effective_perm_part) in zip_longest(request_perm_parts,
66
                                                 effective_perm_parts):
67
68
            if (
69
                request_perm_part != effective_perm_part and
70
                effective_perm_part != '*' and
71
                effective_perm_part is not None
72
            ):
73
                return False
74
75
        return True
76
77
    def get_resources(self, service=None, name=None, resource=None,
78
                      include_locked=False):
79
        """ Gets a list of resources for which the auth token is granted.
80
81
        The permissions of a token is the sum of:
82
83
        - token's permissions
84
        - user's permissions
85
        - user's roles permissions
86
        - token's roles permissions
87
88
        Roles are linked to organizations.
89
90
        This function doesn't return the permissions retrieved from locked
91
        organizations unless `include_locked` is True. Setting `include_lock`
92
        to True is useful when you need to check the permissions of a token,
93
        but don't care if the owner's organization is locked or not.
94
95
        Note: If you - the reader - are not a staff member, this pydoc might be
96
        a little confusing. Roles and permissions are not yet fully exposed by
97
        our APIs, but I promise we will try to expose and document them very
98
        soon. Anyway, if you have questions, we'll be glad to answer you guys!
99
        """
100
        assert isinstance(include_locked, bool)
101
102
        if not self.auth_token:
103
            return []
104
105
        # GET /tokens/:id/permissions on account-api
106
        try:
107
            response = self.query() \
108
                           .tokens(self.auth_token) \
109
                           .permissions.get(include_locked=include_locked)
110
        except slumber.exceptions.HttpClientError as exc:
111
            if exc.response.status_code in (400, 404):
112
                raise BadToken()
113
114
            if exc.response.status_code == 410:
115
                raise ExpiredToken()
116
117
            raise
118
119
        # Apply filters on effective permissions
120
        #
121
        # >>> print response.get('permissions')
122
        # {
123
        #   'service_name': {
124
        #      'perm_name': ['resource1', 'resource2', ...],
125
        #      ...
126
        #   },
127
        #   ...
128
        # }
129
        ret = []
130
131
        for (eff_service_name,
132
             eff_service_perms) in response.get('permissions', {}).items():
133
134
            # Filter on service
135
            if eff_service_name == service or service is None:
136
137
                # Filter on perms
138
                for (eff_perm_name,
139
                     eff_perm_resources) in eff_service_perms.items():
140
141
                    if self.perm_matches(name, eff_perm_name):
142
143
                        # Filter on resources
144
                        ret.extend([
145
                            eff_perm_resource
146
                            for eff_perm_resource in eff_perm_resources
147
                            if self.perm_matches(resource, eff_perm_resource)
148
                        ])
149
150
        return list(set(ret))
151
152
    def has_perm(self, service=None, name=None, resource=None,
153
                 include_locked=False):
154
        """ Checks if the token has a permission.
155
        """
156
        return bool(
157
            self.get_resources(service=service, name=name, resource=resource,
158
                               include_locked=include_locked)
159
        )
160
161
    def get_quotas(self, organization):
162
        """ Gets a list of quotas for the given organization.
163
        """
164
        response = self.query().organizations(organization).quotas.get()
165
        return response['quotas']
166
167
    def get_quota(self, organization, resource):
168
        """ Gets one quota for the given organization.
169
        """
170
        quotas = self.get_quotas(organization)
171
        return quotas.get(resource)
172
173
    def has_quota(self, organization, resource, used=None):
174
        """ Checks if `organization` has the quota set for `resource`, and if
175
        `used` is not None, also checks if the quota value is higher than
176
        `used`.
177
        """
178
        quotas = self.get_quotas(organization=organization)
179
180
        # Check if the quota is set
181
        quota_value = quotas.get(resource)
182
        if quota_value is None:
183
            return False
184
185
        # If `used` is not None, check it is lower than `quota_value`
186
        if used is not None and used >= quota_value:
187
            return False
188
189
        return True
190