TestAccountAPI   A
last analyzed

Complexity

Total Complexity 16

Size/Duplication

Total Lines 251
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
dl 0
loc 251
rs 10
c 1
b 0
f 0
wmc 16

15 Methods

Rating   Name   Duplication   Size   Complexity  
A test_get_quota() 0 3 1
A compare_results() 0 23 2
A test_get_quota_403() 0 8 1
A make_fake_perms() 0 6 1
A test_get_quota_None() 0 3 1
A make_fake_quotas() 0 6 1
A test_has_quota() 0 5 1
B test_has_perm() 0 43 2
B test_get_resources() 0 92 3
A setUp() 0 7 1
A test_get_quotas() 0 4 1
A test_get_resources_with_empty_token() 0 3 1
A test_perm_matches() 0 18 1
A test_get_resources_with_invalid_token() 0 21 1
A has_perm() 0 7 1
1
# -*- coding: utf-8 -*-
2
#
3
# Copyright (c) 2013-2016 Online SAS and Contributors. All Rights Reserved.
4
#                         Julien Castets <[email protected]>
5
#                         Romain Gay <[email protected]>
6
#                         Kevin Deldycke <[email protected]>
7
#
8
# Licensed under the BSD 2-Clause License (the "License"); you may not use this
9
# file except in compliance with the License. You may obtain a copy of the
10
# License at https://opensource.org/licenses/BSD-2-Clause
11
12
import unittest
13
import uuid
14
15
import httpretty
16
import slumber
17
from scaleway.apis import AccountAPI
18
from scaleway.apis.api_account import BadToken, ExpiredToken
19
20
from . import FakeAPITestCase
21
22
23
class TestAccountAPI(FakeAPITestCase, unittest.TestCase):
24
25
    fake_permissions = {
26
        'compute': {
27
            'can_boot': ['server1', 'server2'],
28
            'can_delete': ['server1'],
29
        },
30
        'account': {
31
            'token:*': ['token1', 'token2'],
32
            'token:read': ['token2', 'token3'],
33
            'token:write': ['token4'],
34
        }
35
    }
36
37
    def setUp(self):
38
        super(TestAccountAPI, self).setUp()
39
        self.api = AccountAPI(
40
            base_url='http://compute.localhost',
41
            auth_token=str(uuid.uuid4())
42
        )
43
        self.fake_orga_key = str(uuid.uuid4())
44
45
    def make_fake_perms(self, permissions):
46
        self.fake_endpoint(
47
            self.api,
48
            'tokens/%s/permissions/' % self.api.auth_token,
49
            body={
50
                'permissions': permissions
51
            }
52
        )
53
54
    def make_fake_quotas(self, quotas):
55
        self.fake_endpoint(
56
            self.api,
57
            'organizations/%s/quotas/' % self.fake_orga_key,
58
            body={
59
                'quotas': quotas
60
            }
61
        )
62
63
    def test_perm_matches(self):
64
        # simple permissions
65
        self.assertTrue(self.api.perm_matches('read', 'read'))
66
        self.assertTrue(self.api.perm_matches(None, 'read'))
67
        self.assertFalse(self.api.perm_matches('write', 'read'))
68
        # wildcard
69
        self.assertTrue(self.api.perm_matches('read', '*'))
70
71
        # nested permissions
72
        self.assertTrue(self.api.perm_matches('object:read', 'object:read'))
73
        self.assertTrue(self.api.perm_matches('object:read', 'object:*'))
74
        self.assertTrue(self.api.perm_matches(None, 'object:*'))
75
        self.assertFalse(self.api.perm_matches('object:write', 'object:read'))
76
77
        # different nested sizes
78
        self.assertTrue(self.api.perm_matches('object:read:subperm', '*'))
79
        self.assertTrue(self.api.perm_matches('object:read:hello', 'object'))
80
        self.assertFalse(self.api.perm_matches('object', 'object:read:hello'))
81
82
    def test_get_resources(self):
83
84
        def compare_results(permissions, service=None, name=None,
85
                            resource=None, result=None, include_locked=False):
86
            """ Resets the auth API endpoint /tokens/:id/permissions, call
87
            get_resources and compare results with what is expected.
88
            """
89
            if result is None:
90
                result = []
91
92
            self.make_fake_perms(permissions)
93
            resources = self.api.get_resources(
94
                service=service, name=name, resource=resource,
95
                include_locked=include_locked
96
            )
97
            # XOR on two sets returns the difference between them
98
            # Used because we don't know in which order api.get_resources
99
            # returns the resources.
100
            self.assertFalse(set(resources) ^ set(result))
101
102
            # Check the API has been requested with
103
            # ?include_locked set to `include_locked`
104
            self.assertEqual(
105
                httpretty.last_request().querystring.get('include_locked'),
106
                [str(include_locked)]
107
            )
108
109
        # No permission, no resource
110
        compare_results({}, result=[])
111
112
        # Simple permissions
113
        compare_results(self.fake_permissions,
114
                        service='compute', name='can_boot',
115
                        result=['server1', 'server2'])
116
117
        compare_results(self.fake_permissions,
118
                        service='compute', name='can_boot',
119
                        result=['server1', 'server2'])
120
121
        compare_results(self.fake_permissions,
122
                        service='compute', name='can_boot', resource='server2',
123
                        result=['server2'])
124
125
        compare_results(self.fake_permissions,
126
                        service='compute', name='can_delete',
127
                        result=['server1'])
128
129
        compare_results(
130
            self.fake_permissions,
131
            service='compute', name='can_delete', resource='server1',
132
            result=['server1']
133
        )
134
135
        compare_results(self.fake_permissions,
136
                        service='compute', name='can_write',
137
                        result=[])
138
139
        # Nested permissions
140
        compare_results(self.fake_permissions,
141
                        service='account', name='token:read',
142
                        result=['token1', 'token2', 'token3'])
143
144
        compare_results(
145
            self.fake_permissions,
146
            service='account', name='token:read', resource='invalid',
147
            result=[]
148
        )
149
150
        compare_results(
151
            self.fake_permissions,
152
            service='account', name='token:read', resource='token2',
153
            result=['token2']
154
        )
155
156
        compare_results(
157
            self.fake_permissions,
158
            service='account', name='token:write',
159
            result=['token1', 'token2', 'token4']
160
        )
161
162
        compare_results(
163
            self.fake_permissions,
164
            service='account', name='token:admin',
165
            result=['token1', 'token2']
166
        )
167
168
        # Include lock set to True
169
        compare_results(
170
            self.fake_permissions,
171
            service='account', name='token:admin',
172
            result=['token1', 'token2'],
173
            include_locked=True
174
        )
175
176
    def test_get_resources_with_empty_token(self):
177
        self.api = AccountAPI()
178
        self.assertEqual(self.api.get_resources(), [])
179
180
    def test_get_resources_with_invalid_token(self):
181
        url = 'tokens/%s/permissions/' % (
182
            self.api.auth_token
183
        )
184
185
        self.fake_endpoint(self.api, url, status=400)
186
        self.assertRaises(BadToken, self.api.get_resources)
187
188
        self.fake_endpoint(self.api, url, status=404)
189
        self.assertRaises(BadToken, self.api.get_resources)
190
191
        self.fake_endpoint(self.api, url, status=410)
192
        self.assertRaises(ExpiredToken, self.api.get_resources)
193
194
        self.fake_endpoint(self.api, url, status=418)
195
        self.assertRaises(slumber.exceptions.SlumberHttpBaseException,
196
                          self.api.get_resources)
197
198
        self.fake_endpoint(self.api, url, status=500)
199
        self.assertRaises(slumber.exceptions.SlumberHttpBaseException,
200
                          self.api.get_resources)
201
202
    def test_has_perm(self):
203
204
        def has_perm(permissions, service=None, name=None, resource=None):
205
            """ Resets the auth API endpoint /tokens/:id/permissions and call
206
            api.has_perm.
207
            """
208
            self.make_fake_perms(permissions)
209
            return self.api.has_perm(service=service, name=name,
210
                                     resource=resource)
211
212
        self.assertTrue(
213
            has_perm(self.fake_permissions,
214
                     service='compute', name='can_boot', resource='server1')
215
        )
216
217
        self.assertTrue(
218
            has_perm(self.fake_permissions,
219
                     service='compute', name='can_boot', resource='server2')
220
        )
221
222
        self.assertFalse(
223
            has_perm(self.fake_permissions,
224
                     service='compute', name='can_boot', resource='server3')
225
        )
226
227
        self.assertTrue(
228
            has_perm(self.fake_permissions,
229
                     service='account', name='token:read', resource='token1')
230
        )
231
232
        self.assertTrue(
233
            has_perm(self.fake_permissions,
234
                     service='account', name='token:write', resource='token1')
235
        )
236
237
        self.assertTrue(
238
            has_perm(self.fake_permissions,
239
                     service='account', name='token:write', resource='token4')
240
        )
241
242
        self.assertFalse(
243
            has_perm(self.fake_permissions,
244
                     service='account', name='token:write', resource='token3')
245
        )
246
247
    def test_get_quota_403(self):
248
        url = 'organizations/%s/quotas/' % (
249
            self.fake_orga_key
250
        )
251
        self.fake_endpoint(self.api, url, status=403)
252
        self.assertRaises(slumber.exceptions.HttpClientError,
253
                          self.api.get_quotas,
254
                          self.fake_orga_key)
255
256
    def test_get_quotas(self):
257
        self.make_fake_quotas({'invites': 5})
258
        self.assertEqual(self.api.get_quotas(self.fake_orga_key),
259
                         {'invites': 5})
260
261
    def test_get_quota(self):
262
        self.make_fake_quotas({'invites': 5})
263
        self.assertEqual(self.api.get_quota(self.fake_orga_key, 'invites'), 5)
264
265
    def test_get_quota_None(self):
266
        self.make_fake_quotas({'invites': 5})
267
        self.assertEqual(self.api.get_quota(self.fake_orga_key, 'xoxo'), None)
268
269
    def test_has_quota(self):
270
        self.make_fake_quotas({'invites': 5})
271
        self.assertTrue(self.api.has_quota(self.fake_orga_key, 'invites', 2))
272
        self.assertFalse(self.api.has_quota(self.fake_orga_key, 'invites', 5))
273
        self.assertFalse(self.api.has_quota(self.fake_orga_key, 'nope'))
274