Completed
Push — develop ( 0f3610...5e335d )
by
unknown
01:36
created

TestAccountAPI.test_get_quota_403()   A

Complexity

Conditions 1

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
dl 0
loc 8
rs 9.4285
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
#
3
# Copyright (c) 2013-2015 Online SAS and Contributors. All Rights Reserved.
4
#                         Julien Castets <[email protected]>
5
#                         Romain Gay <[email protected]>
6
#                         Kevin Deldycke <[email protected]>
7
#
8
# Licensed under the BSD 2-Clause License (the "License"); you may not use this
9
# file except in compliance with the License. You may obtain a copy of the
10
# License at http://opensource.org/licenses/BSD-2-Clause
11
12
import unittest
13
import uuid
14
15
import httpretty
16
import slumber
17
18
from scaleway.apis import AccountAPI
19
from scaleway.apis.api_account import BadToken, ExpiredToken
20
21
from . import FakeAPITestCase
22
23
24
class TestAccountAPI(FakeAPITestCase, unittest.TestCase):
25
26
    fake_permissions = {
27
        'compute': {
28
            'can_boot': ['server1', 'server2'],
29
            'can_delete': ['server1'],
30
        },
31
        'account': {
32
            'token:*': ['token1', 'token2'],
33
            'token:read': ['token2', 'token3'],
34
            'token:write': ['token4'],
35
        }
36
    }
37
38
    def setUp(self):
39
        super(TestAccountAPI, self).setUp()
40
        self.api = AccountAPI(
41
            base_url='http://compute.localhost',
42
            auth_token=str(uuid.uuid4())
43
        )
44
        self.fake_orga_key = str(uuid.uuid4())
45
46
    def make_fake_perms(self, permissions):
47
        self.fake_endpoint(
48
            self.api,
49
            'tokens/%s/permissions/' % self.api.auth_token,
50
            body={
51
                'permissions': permissions
52
            }
53
        )
54
55
    def make_fake_quotas(self, quotas):
56
        self.fake_endpoint(
57
            self.api,
58
            'organizations/%s/quotas/' % self.fake_orga_key,
59
            body={
60
                'quotas': quotas
61
            }
62
        )
63
64
    def test_perm_matches(self):
65
        # simple permissions
66
        self.assertTrue(self.api.perm_matches('read', 'read'))
67
        self.assertTrue(self.api.perm_matches(None, 'read'))
68
        self.assertFalse(self.api.perm_matches('write', 'read'))
69
        # wildcard
70
        self.assertTrue(self.api.perm_matches('read', '*'))
71
72
        # nested permissions
73
        self.assertTrue(self.api.perm_matches('object:read', 'object:read'))
74
        self.assertTrue(self.api.perm_matches('object:read', 'object:*'))
75
        self.assertTrue(self.api.perm_matches(None, 'object:*'))
76
        self.assertFalse(self.api.perm_matches('object:write', 'object:read'))
77
78
        # different nested sizes
79
        self.assertTrue(self.api.perm_matches('object:read:subperm', '*'))
80
        self.assertTrue(self.api.perm_matches('object:read:hello', 'object'))
81
        self.assertFalse(self.api.perm_matches('object', 'object:read:hello'))
82
83
    def test_get_resources(self):
84
85
        def compare_results(permissions, service=None, name=None,
86
                            resource=None, result=None, include_locked=False):
87
            """ Resets the auth API endpoint /tokens/:id/permissions, call
88
            get_resources and compare results with what is expected.
89
            """
90
            if result is None:
91
                result = []
92
93
            self.make_fake_perms(permissions)
94
            resources = self.api.get_resources(
95
                service=service, name=name, resource=resource,
96
                include_locked=include_locked
97
            )
98
            # XOR on two sets returns the difference between them
99
            # Used because we don't know in which order api.get_resources
100
            # returns the resources.
101
            self.assertFalse(set(resources) ^ set(result))
102
103
            # Check the API has been requested with
104
            # ?include_locked set to `include_locked`
105
            self.assertEqual(
106
                httpretty.last_request().querystring.get('include_locked'),
107
                [str(include_locked)]
108
            )
109
110
        # No permission, no resource
111
        compare_results({}, result=[])
112
113
        # Simple permissions
114
        compare_results(self.fake_permissions,
115
                        service='compute', name='can_boot',
116
                        result=['server1', 'server2'])
117
118
        compare_results(self.fake_permissions,
119
                        service='compute', name='can_boot',
120
                        result=['server1', 'server2'])
121
122
        compare_results(self.fake_permissions,
123
                        service='compute', name='can_boot', resource='server2',
124
                        result=['server2'])
125
126
        compare_results(self.fake_permissions,
127
                        service='compute', name='can_delete',
128
                        result=['server1'])
129
130
        compare_results(
131
            self.fake_permissions,
132
            service='compute', name='can_delete', resource='server1',
133
            result=['server1']
134
        )
135
136
        compare_results(self.fake_permissions,
137
                        service='compute', name='can_write',
138
                        result=[])
139
140
        # Nested permissions
141
        compare_results(self.fake_permissions,
142
                        service='account', name='token:read',
143
                        result=['token1', 'token2', 'token3'])
144
145
        compare_results(
146
            self.fake_permissions,
147
            service='account', name='token:read', resource='invalid',
148
            result=[]
149
        )
150
151
        compare_results(
152
            self.fake_permissions,
153
            service='account', name='token:read', resource='token2',
154
            result=['token2']
155
        )
156
157
        compare_results(
158
            self.fake_permissions,
159
            service='account', name='token:write',
160
            result=['token1', 'token2', 'token4']
161
        )
162
163
        compare_results(
164
            self.fake_permissions,
165
            service='account', name='token:admin',
166
            result=['token1', 'token2']
167
        )
168
169
        # Include lock set to True
170
        compare_results(
171
            self.fake_permissions,
172
            service='account', name='token:admin',
173
            result=['token1', 'token2'],
174
            include_locked=True
175
        )
176
177
    def test_get_resources_with_empty_token(self):
178
        self.api = AccountAPI()
179
        self.assertEqual(self.api.get_resources(), [])
180
181
    def test_get_resources_with_invalid_token(self):
182
        url = 'tokens/%s/permissions/' % (
183
            self.api.auth_token
184
        )
185
186
        self.fake_endpoint(self.api, url, status=400)
187
        self.assertRaises(BadToken, self.api.get_resources)
188
189
        self.fake_endpoint(self.api, url, status=404)
190
        self.assertRaises(BadToken, self.api.get_resources)
191
192
        self.fake_endpoint(self.api, url, status=410)
193
        self.assertRaises(ExpiredToken, self.api.get_resources)
194
195
        self.fake_endpoint(self.api, url, status=418)
196
        self.assertRaises(slumber.exceptions.SlumberHttpBaseException,
197
                          self.api.get_resources)
198
199
        self.fake_endpoint(self.api, url, status=500)
200
        self.assertRaises(slumber.exceptions.SlumberHttpBaseException,
201
                          self.api.get_resources)
202
203
    def test_has_perm(self):
204
205
        def has_perm(permissions, service=None, name=None, resource=None):
206
            """ Resets the auth API endpoint /tokens/:id/permissions and call
207
            api.has_perm.
208
            """
209
            self.make_fake_perms(permissions)
210
            return self.api.has_perm(service=service, name=name,
211
                                     resource=resource)
212
213
        self.assertTrue(
214
            has_perm(self.fake_permissions,
215
                     service='compute', name='can_boot', resource='server1')
216
        )
217
218
        self.assertTrue(
219
            has_perm(self.fake_permissions,
220
                     service='compute', name='can_boot', resource='server2')
221
        )
222
223
        self.assertFalse(
224
            has_perm(self.fake_permissions,
225
                     service='compute', name='can_boot', resource='server3')
226
        )
227
228
        self.assertTrue(
229
            has_perm(self.fake_permissions,
230
                     service='account', name='token:read', resource='token1')
231
        )
232
233
        self.assertTrue(
234
            has_perm(self.fake_permissions,
235
                     service='account', name='token:write', resource='token1')
236
        )
237
238
        self.assertTrue(
239
            has_perm(self.fake_permissions,
240
                     service='account', name='token:write', resource='token4')
241
        )
242
243
        self.assertFalse(
244
            has_perm(self.fake_permissions,
245
                     service='account', name='token:write', resource='token3')
246
        )
247
248
    def test_get_quota_403(self):
249
        url = 'organizations/%s/quotas/' % (
250
            self.fake_orga_key
251
        )
252
        self.fake_endpoint(self.api, url, status=403)
253
        self.assertRaises(slumber.exceptions.HttpClientError,
254
                          self.api.get_quotas,
255
                          self.fake_orga_key)
256
257
    def test_get_quotas(self):
258
        self.make_fake_quotas({'invites': 5})
259
        self.assertEqual(self.api.get_quotas(self.fake_orga_key),
260
                         {'invites': 5})
261
262
    def test_get_quota(self):
263
        self.make_fake_quotas({'invites': 5})
264
        self.assertEqual(self.api.get_quota(self.fake_orga_key, 'invites'), 5)
265
266
    def test_get_quota_None(self):
267
        self.make_fake_quotas({'invites': 5})
268
        self.assertEqual(self.api.get_quota(self.fake_orga_key, 'xoxo'), None)
269
270
    def test_has_quota(self):
271
        self.make_fake_quotas({'invites': 5})
272
        self.assertTrue(self.api.has_quota(self.fake_orga_key, 'invites', 2))
273
        self.assertFalse(self.api.has_quota(self.fake_orga_key, 'invites', 5))
274
        self.assertFalse(self.api.has_quota(self.fake_orga_key, 'nope'))
275