Completed
Push — master ( 265317...7ca1a7 )
by W
06:30
created

st2common.rbac.request_user_has_resource_api_permission()   A

Complexity

Conditions 1

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 8
rs 9.4286
cc 1
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
"""
17
RBAC related utility functions.
18
"""
19
20
import six
21
22
from oslo_config import cfg
23
24
from st2common.exceptions.rbac import AccessDeniedError
25
from st2common.exceptions.rbac import ResourceTypeAccessDeniedError
26
from st2common.exceptions.rbac import ResourceAccessDeniedError
27
from st2common.rbac.types import PermissionType
28
from st2common.rbac.types import ResourceType
29
from st2common.rbac.types import SystemRole
30
from st2common.rbac import resolvers
31
from st2common.services import rbac as rbac_services
32
from st2common.util import action_db as action_utils
33
34
__all__ = [
35
    'request_user_is_admin',
36
    'request_user_is_system_admin',
37
    'request_user_has_role',
38
    'request_user_has_permission',
39
    'request_user_has_resource_api_permission',
40
    'request_user_has_resource_db_permission',
41
42
    'request_user_has_rule_trigger_permission',
43
    'request_user_has_rule_action_permission',
44
45
    'assert_request_user_is_admin',
46
    'assert_request_user_is_system_admin',
47
    'assert_request_user_has_permission',
48
    'assert_request_user_has_resource_db_permission',
49
50
    'assert_request_user_has_rule_trigger_and_action_permission',
51
52
    'user_is_admin',
53
    'user_is_system_admin',
54
    'user_has_permission',
55
    'user_has_resource_api_permission',
56
    'user_has_resource_db_permission',
57
    'user_has_role',
58
59
    'get_user_db_from_request'
60
]
61
62
63
def request_user_is_admin(request):
64
    """
65
    Check if the logged-in request user has admin (either system admin or admin) role.
66
67
    :rtype: ``bool``
68
    """
69
    user_db = get_user_db_from_request(request=request)
70
    return user_is_admin(user_db=user_db)
71
72
73
def request_user_is_system_admin(request):
74
    """
75
    Check if the logged-in request user has system admin role.
76
77
    :rtype: ``bool``
78
    """
79
    user_db = get_user_db_from_request(request=request)
80
    return user_is_system_admin(user_db=user_db)
81
82
83
def request_user_has_role(request, role):
84
    """
85
    Check if the logged-in request user has the provided role.
86
87
    :param role: Name of the role to check for.
88
    :type role: ``str``
89
90
    :rtype: ``bool``
91
    """
92
    # TODO: Once RBAC is implemented, we should not support running production (non-dev)
93
    # deployments with auth disabled.
94
    if not cfg.CONF.auth.enable:
95
        return True
96
97
    user_db = get_user_db_from_request(request=request)
98
    return user_has_role(user_db=user_db, role=role)
99
100
101
def request_user_has_permission(request, permission_type):
102
    """
103
    Check that currently logged-in user has specified permission.
104
105
    :rtype: ``bool``
106
    """
107
    user_db = get_user_db_from_request(request=request)
108
    return user_has_permission(user_db=user_db, permission_type=permission_type)
109
110
111
def request_user_has_resource_api_permission(request, resource_api, permission_type):
112
    """
113
    Check that currently logged-in user has specified permission for the resource which is to be
114
    created.
115
    """
116
    user_db = get_user_db_from_request(request=request)
117
    return user_has_resource_api_permission(user_db=user_db, resource_api=resource_api,
118
                                            permission_type=permission_type)
119
120
121
def request_user_has_resource_db_permission(request, resource_db, permission_type):
122
    """
123
    Check that currently logged-in user has specified permission on the provied resource.
124
125
    :rtype: ``bool``
126
    """
127
    user_db = get_user_db_from_request(request=request)
128
    return user_has_resource_db_permission(user_db=user_db, resource_db=resource_db,
129
                                           permission_type=permission_type)
130
131
132
def assert_request_user_is_admin(request):
133
    """
134
    Assert that the currently logged in user is an administrator.
135
136
    If the user is not an administrator, an exception is thrown.
137
    """
138
    is_admin = request_user_is_admin(request=request)
139
140
    if not is_admin:
141
        user_db = get_user_db_from_request(request=request)
142
        raise AccessDeniedError(message='Administrator access required',
143
                                user_db=user_db)
144
145
146
def assert_request_user_is_system_admin(request):
147
    """
148
    Assert that the currently logged in user is a system administrator.
149
150
    If the user is not a system administrator, an exception is thrown.
151
    """
152
    is_system_admin = request_user_is_system_admin(request=request)
153
154
    if not is_system_admin:
155
        user_db = get_user_db_from_request(request=request)
156
        raise AccessDeniedError(message='System Administrator access required',
157
                                user_db=user_db)
158
159
160
def assert_request_user_has_permission(request, permission_type):
161
    """
162
    Check that currently logged-in user has specified permission.
163
164
    If user doesn't have a required permission, AccessDeniedError s thrown.
165
    """
166
    has_permission = request_user_has_permission(request=request,
167
                                                 permission_type=permission_type)
168
169
    if not has_permission:
170
        user_db = get_user_db_from_request(request=request)
171
        raise ResourceTypeAccessDeniedError(user_db=user_db, permission_type=permission_type)
172
173
174
def assert_request_user_has_resource_api_permission(request, resource_api, permission_type):
175
    """
176
    Check that currently logged-in user has specified permission for the resource which is to be
177
    created.
178
    """
179
    has_permission = request_user_has_resource_api_permission(request=request,
180
                                                              resource_api=resource_api,
181
                                                              permission_type=permission_type)
182
183
    if not has_permission:
184
        user_db = get_user_db_from_request(request=request)
185
        # TODO: Refactor exception
186
        raise ResourceAccessDeniedError(user_db=user_db, resource_db=resource_api,
187
                                        permission_type=permission_type)
188
189
190
def assert_request_user_has_resource_db_permission(request, resource_db, permission_type):
191
    """
192
    Check that currently logged-in user has specified permission on the provied resource.
193
194
    If user doesn't have a required permission, AccessDeniedError is thrown.
195
    """
196
    has_permission = request_user_has_resource_db_permission(request=request,
197
                                                             resource_db=resource_db,
198
                                                             permission_type=permission_type)
199
200
    if not has_permission:
201
        user_db = get_user_db_from_request(request=request)
202
        raise ResourceAccessDeniedError(user_db=user_db, resource_db=resource_db,
203
                                        permission_type=permission_type)
204
205
206
def request_user_has_rule_trigger_permission(request, trigger):
207
    """
208
    Check that the currently logged-in has necessary permissions on the trigger used / referenced
209
    inside the rule.
210
    """
211
    if not cfg.CONF.rbac.enable:
212
        return True
213
214
    user_db = get_user_db_from_request(request=request)
215
    rules_resolver = resolvers.get_resolver_for_resource_type(ResourceType.RULE)
216
    has_trigger_permission = rules_resolver.user_has_trigger_permission(user_db=user_db,
217
                                                                        trigger=trigger)
218
219
    if has_trigger_permission:
220
        return True
221
222
    return False
223
224
225
def request_user_has_rule_action_permission(request, action_ref):
226
    """
227
    Check that the currently logged-in has necessary permissions on the action used / referenced
228
    inside the rule.
229
    """
230
    if not cfg.CONF.rbac.enable:
231
        return True
232
233
    user_db = get_user_db_from_request(request=request)
234
    action_db = action_utils.get_action_by_ref(ref=action_ref)
235
    action_resolver = resolvers.get_resolver_for_resource_type(ResourceType.ACTION)
236
    has_action_permission = action_resolver.user_has_resource_db_permission(
237
        user_db=user_db, resource_db=action_db, permission_type=PermissionType.ACTION_EXECUTE)
238
239
    if has_action_permission:
240
        return True
241
242
    return False
243
244
245
def assert_request_user_has_rule_trigger_and_action_permission(request, rule_api):
246
    """
247
    Check that the currently logged-in has necessary permissions on trhe trigger and action
248
    used / referenced inside the rule.
249
    """
250
251
    if not cfg.CONF.rbac.enable:
252
        return True
253
254
    trigger = rule_api.trigger
255
    action = rule_api.action
256
    trigger_type = trigger['type']
257
    action_ref = action['ref']
258
259
    user_db = get_user_db_from_request(request=request)
260
261
    # Check that user has access to the specified trigger - right now we only check for
262
    # webhook permissions
263
    has_trigger_permission = request_user_has_rule_trigger_permission(request=request,
264
                                                                      trigger=trigger)
265
266
    if not has_trigger_permission:
267
        msg = ('User "%s" doesn\'t have required permission (%s) to use trigger %s' %
268
               (user_db.name, PermissionType.WEBHOOK_CREATE, trigger_type))
269
        raise AccessDeniedError(message=msg, user_db=user_db)
270
271
    # Check that user has access to the specified action
272
    has_action_permission = request_user_has_rule_action_permission(request=request,
273
                                                                    action_ref=action_ref)
274
275
    if not has_action_permission:
276
        msg = ('User "%s" doesn\'t have required (%s) permission to use action %s' %
277
               (user_db.name, PermissionType.ACTION_EXECUTE, action_ref))
278
        raise AccessDeniedError(message=msg, user_db=user_db)
279
280
    return True
281
282
283
def user_is_admin(user_db):
284
    """
285
    Return True if the provided user has admin role (either system admin or admin), false
286
    otherwise.
287
288
    :param user_db: User object to check for.
289
    :type user_db: :class:`UserDB`
290
291
    :rtype: ``bool``
292
    """
293
    is_system_admin = user_is_system_admin(user_db=user_db)
294
    if is_system_admin:
295
        return True
296
297
    is_admin = user_has_role(user_db=user_db, role=SystemRole.ADMIN)
298
    if is_admin:
299
        return True
300
301
    return False
302
303
304
def user_is_system_admin(user_db):
305
    """
306
    Return True if the provided user has system admin rule, false otherwise.
307
308
    :param user_db: User object to check for.
309
    :type user_db: :class:`UserDB`
310
311
    :rtype: ``bool``
312
    """
313
    return user_has_role(user_db=user_db, role=SystemRole.SYSTEM_ADMIN)
314
315
316
def user_has_role(user_db, role):
317
    """
318
    :param user: User object to check for.
319
    :type user: :class:`UserDB`
320
321
    :param role: Role name to check for.
322
    :type role: ``str``
323
324
    :rtype: ``bool``
325
    """
326
    assert isinstance(role, six.string_types)
327
328
    if not cfg.CONF.rbac.enable:
329
        return True
330
331
    user_role_dbs = rbac_services.get_roles_for_user(user_db=user_db)
332
    user_role_names = [role_db.name for role_db in user_role_dbs]
333
334
    return role in user_role_names
335
336
337
def user_has_permission(user_db, permission_type):
338
    """
339
    Check that the provided user has specified permission.
340
    """
341
    if not cfg.CONF.rbac.enable:
342
        return True
343
344
    # TODO Verify permission type for the provided resource type
345
    resolver = resolvers.get_resolver_for_permission_type(permission_type=permission_type)
346
    result = resolver.user_has_permission(user_db=user_db, permission_type=permission_type)
347
    return result
348
349
350
def user_has_resource_api_permission(user_db, resource_api, permission_type):
351
    """
352
    Check that the provided user has specified permission on the provided resource API.
353
    """
354
    if not cfg.CONF.rbac.enable:
355
        return True
356
357
    # TODO Verify permission type for the provided resource type
358
    resolver = resolvers.get_resolver_for_permission_type(permission_type=permission_type)
359
    result = resolver.user_has_resource_api_permission(user_db=user_db, resource_api=resource_api,
360
                                                       permission_type=permission_type)
361
    return result
362
363
364
def user_has_resource_db_permission(user_db, resource_db, permission_type):
365
    """
366
    Check that the provided user has specified permission on the provided resource.
367
    """
368
    if not cfg.CONF.rbac.enable:
369
        return True
370
371
    # TODO Verify permission type for the provided resource type
372
    resolver = resolvers.get_resolver_for_permission_type(permission_type=permission_type)
373
    result = resolver.user_has_resource_db_permission(user_db=user_db, resource_db=resource_db,
374
                                                      permission_type=permission_type)
375
    return result
376
377
378
def get_user_db_from_request(request):
379
    """
380
    Retrieve UserDB object from the provided request.
381
    """
382
    auth_context = request.context.get('auth', {})
383
384
    if not auth_context:
385
        return None
386
387
    user_db = auth_context.get('user', None)
388
    return user_db
389