Passed
Push — develop ( f534b1...a82689 )
by Plexxi
06:09 queued 03:13
created

user_has_rule_action_permission()   A

Complexity

Conditions 3

Size

Total Lines 17

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
dl 0
loc 17
rs 9.4285
c 0
b 0
f 0
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
"""
17
RBAC related utility functions.
18
"""
19
20
import six
21
22
from oslo_config import cfg
23
24
from st2common.exceptions.rbac import AccessDeniedError
25
from st2common.exceptions.rbac import ResourceTypeAccessDeniedError
26
from st2common.exceptions.rbac import ResourceAccessDeniedError
27
from st2common.rbac.types import PermissionType
28
from st2common.rbac.types import ResourceType
29
from st2common.rbac.types import SystemRole
30
from st2common.rbac import resolvers
31
from st2common.services import rbac as rbac_services
32
from st2common.util import action_db as action_utils
33
34
__all__ = [
35
    'user_has_rule_trigger_permission',
36
    'user_has_rule_action_permission',
37
38
    'assert_user_is_admin',
39
    'assert_user_is_system_admin',
40
    'assert_user_has_permission',
41
    'assert_user_has_resource_db_permission',
42
43
    'assert_user_is_admin_if_user_query_param_is_provided',
44
45
    'assert_user_has_rule_trigger_and_action_permission',
46
47
    'user_is_admin',
48
    'user_is_system_admin',
49
    'user_has_permission',
50
    'user_has_resource_api_permission',
51
    'user_has_resource_db_permission',
52
    'user_has_role',
53
54
    'get_user_db_from_request'
55
]
56
57
58
def assert_user_is_admin(user_db):
59
    """
60
    Assert that the currently logged in user is an administrator.
61
62
    If the user is not an administrator, an exception is thrown.
63
    """
64
    is_admin = user_is_admin(user_db=user_db)
65
66
    if not is_admin:
67
        raise AccessDeniedError(message='Administrator access required',
68
                                user_db=user_db)
69
70
71
def assert_user_is_system_admin(user_db):
72
    """
73
    Assert that the currently logged in user is a system administrator.
74
75
    If the user is not a system administrator, an exception is thrown.
76
    """
77
    is_system_admin = user_is_system_admin(user_db=user_db)
78
79
    if not is_system_admin:
80
        raise AccessDeniedError(message='System Administrator access required',
81
                                user_db=user_db)
82
83
84
def assert_user_has_permission(user_db, permission_type):
85
    """
86
    Check that currently logged-in user has specified permission.
87
88
    If user doesn't have a required permission, AccessDeniedError s thrown.
89
    """
90
    has_permission = user_has_permission(user_db=user_db, permission_type=permission_type)
91
92
    if not has_permission:
93
        raise ResourceTypeAccessDeniedError(user_db=user_db, permission_type=permission_type)
94
95
96
def assert_user_has_resource_api_permission(user_db, resource_api, permission_type):
97
    """
98
    Check that currently logged-in user has specified permission for the resource which is to be
99
    created.
100
    """
101
    has_permission = user_has_resource_api_permission(user_db=user_db,
102
                                                      resource_api=resource_api,
103
                                                      permission_type=permission_type)
104
105
    if not has_permission:
106
        # TODO: Refactor exception
107
        raise ResourceAccessDeniedError(user_db=user_db, resource_db=resource_api,
108
                                        permission_type=permission_type)
109
110
111
def assert_user_has_resource_db_permission(user_db, resource_db, permission_type):
112
    """
113
    Check that currently logged-in user has specified permission on the provied resource.
114
115
    If user doesn't have a required permission, AccessDeniedError is thrown.
116
    """
117
    has_permission = user_has_resource_db_permission(user_db=user_db,
118
                                                     resource_db=resource_db,
119
                                                     permission_type=permission_type)
120
121
    if not has_permission:
122
        raise ResourceAccessDeniedError(user_db=user_db, resource_db=resource_db,
123
                                        permission_type=permission_type)
124
125
126
def user_has_rule_trigger_permission(user_db, trigger):
127
    """
128
    Check that the currently logged-in has necessary permissions on the trigger used / referenced
129
    inside the rule.
130
    """
131
    if not cfg.CONF.rbac.enable:
132
        return True
133
134
    rules_resolver = resolvers.get_resolver_for_resource_type(ResourceType.RULE)
135
    has_trigger_permission = rules_resolver.user_has_trigger_permission(user_db=user_db,
136
                                                                        trigger=trigger)
137
138
    if has_trigger_permission:
139
        return True
140
141
    return False
142
143
144
def user_has_rule_action_permission(user_db, action_ref):
145
    """
146
    Check that the currently logged-in has necessary permissions on the action used / referenced
147
    inside the rule.
148
    """
149
    if not cfg.CONF.rbac.enable:
150
        return True
151
152
    action_db = action_utils.get_action_by_ref(ref=action_ref)
153
    action_resolver = resolvers.get_resolver_for_resource_type(ResourceType.ACTION)
154
    has_action_permission = action_resolver.user_has_resource_db_permission(
155
        user_db=user_db, resource_db=action_db, permission_type=PermissionType.ACTION_EXECUTE)
156
157
    if has_action_permission:
158
        return True
159
160
    return False
161
162
163
def assert_user_has_rule_trigger_and_action_permission(user_db, rule_api):
164
    """
165
    Check that the currently logged-in has necessary permissions on trhe trigger and action
166
    used / referenced inside the rule.
167
    """
168
169
    if not cfg.CONF.rbac.enable:
170
        return True
171
172
    trigger = rule_api.trigger
173
    action = rule_api.action
174
    trigger_type = trigger['type']
175
    action_ref = action['ref']
176
177
    # Check that user has access to the specified trigger - right now we only check for
178
    # webhook permissions
179
    has_trigger_permission = user_has_rule_trigger_permission(user_db=user_db,
180
                                                              trigger=trigger)
181
182
    if not has_trigger_permission:
183
        msg = ('User "%s" doesn\'t have required permission (%s) to use trigger %s' %
184
               (user_db.name, PermissionType.WEBHOOK_CREATE, trigger_type))
185
        raise AccessDeniedError(message=msg, user_db=user_db)
186
187
    # Check that user has access to the specified action
188
    has_action_permission = user_has_rule_action_permission(user_db=user_db,
189
                                                            action_ref=action_ref)
190
191
    if not has_action_permission:
192
        msg = ('User "%s" doesn\'t have required (%s) permission to use action %s' %
193
               (user_db.name, PermissionType.ACTION_EXECUTE, action_ref))
194
        raise AccessDeniedError(message=msg, user_db=user_db)
195
196
    return True
197
198
199
def assert_user_is_admin_if_user_query_param_is_provided(user_db, user):
200
    """
201
    Function which asserts that the request user is administator if "user" query parameter is
202
    provided and doesn't match the current user.
203
    """
204
    is_admin = user_is_admin(user_db=user_db)
205
206
    if user != user_db.name and not is_admin:
207
        msg = '"user" attribute can only be provided by admins'
208
        raise AccessDeniedError(message=msg, user_db=user_db)
209
210
211
def user_is_admin(user_db):
212
    """
213
    Return True if the provided user has admin role (either system admin or admin), false
214
    otherwise.
215
216
    :param user_db: User object to check for.
217
    :type user_db: :class:`UserDB`
218
219
    :rtype: ``bool``
220
    """
221
    is_system_admin = user_is_system_admin(user_db=user_db)
222
    if is_system_admin:
223
        return True
224
225
    is_admin = user_has_role(user_db=user_db, role=SystemRole.ADMIN)
226
    if is_admin:
227
        return True
228
229
    return False
230
231
232
def user_is_system_admin(user_db):
233
    """
234
    Return True if the provided user has system admin rule, false otherwise.
235
236
    :param user_db: User object to check for.
237
    :type user_db: :class:`UserDB`
238
239
    :rtype: ``bool``
240
    """
241
    return user_has_role(user_db=user_db, role=SystemRole.SYSTEM_ADMIN)
242
243
244
def user_has_role(user_db, role):
245
    """
246
    :param user: User object to check for.
247
    :type user: :class:`UserDB`
248
249
    :param role: Role name to check for.
250
    :type role: ``str``
251
252
    :rtype: ``bool``
253
    """
254
    assert isinstance(role, six.string_types)
255
256
    if not cfg.CONF.rbac.enable:
257
        return True
258
259
    user_role_dbs = rbac_services.get_roles_for_user(user_db=user_db)
260
    user_role_names = [role_db.name for role_db in user_role_dbs]
261
262
    return role in user_role_names
263
264
265
def user_has_permission(user_db, permission_type):
266
    """
267
    Check that the provided user has specified permission.
268
    """
269
    if not cfg.CONF.rbac.enable:
270
        return True
271
272
    # TODO Verify permission type for the provided resource type
273
    resolver = resolvers.get_resolver_for_permission_type(permission_type=permission_type)
274
    result = resolver.user_has_permission(user_db=user_db, permission_type=permission_type)
275
    return result
276
277
278
def user_has_resource_api_permission(user_db, resource_api, permission_type):
279
    """
280
    Check that the provided user has specified permission on the provided resource API.
281
    """
282
    if not cfg.CONF.rbac.enable:
283
        return True
284
285
    # TODO Verify permission type for the provided resource type
286
    resolver = resolvers.get_resolver_for_permission_type(permission_type=permission_type)
287
    result = resolver.user_has_resource_api_permission(user_db=user_db, resource_api=resource_api,
288
                                                       permission_type=permission_type)
289
    return result
290
291
292
def user_has_resource_db_permission(user_db, resource_db, permission_type):
293
    """
294
    Check that the provided user has specified permission on the provided resource.
295
    """
296
    if not cfg.CONF.rbac.enable:
297
        return True
298
299
    # TODO Verify permission type for the provided resource type
300
    resolver = resolvers.get_resolver_for_permission_type(permission_type=permission_type)
301
    result = resolver.user_has_resource_db_permission(user_db=user_db, resource_db=resource_db,
302
                                                      permission_type=permission_type)
303
    return result
304
305
306
def get_user_db_from_request(request):
307
    """
308
    Retrieve UserDB object from the provided request.
309
    """
310
    auth_context = request.context.get('auth', {})
311
312
    if not auth_context:
313
        return None
314
315
    user_db = auth_context.get('user', None)
316
    return user_db
317