GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Passed
Push — plexxi-v2.3.2 ( 5d46fe )
by
unknown
06:59
created

user_has_rule_action_permission()   B

Complexity

Conditions 4

Size

Total Lines 26

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 4
dl 0
loc 26
rs 8.5806
c 1
b 0
f 0
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
"""
17
RBAC related utility functions.
18
"""
19
20
import six
21
22
from oslo_config import cfg
23
24
from st2common.models.db.action import ActionDB
25
from st2common.models.system.common import ResourceReference
26
from st2common.exceptions.rbac import AccessDeniedError
27
from st2common.exceptions.rbac import ResourceTypeAccessDeniedError
28
from st2common.exceptions.rbac import ResourceAccessDeniedError
29
from st2common.rbac.types import PermissionType
30
from st2common.rbac.types import ResourceType
31
from st2common.rbac.types import SystemRole
32
from st2common.rbac import resolvers
33
from st2common.services import rbac as rbac_services
34
from st2common.util import action_db as action_utils
35
36
__all__ = [
37
    'user_has_rule_trigger_permission',
38
    'user_has_rule_action_permission',
39
40
    'assert_user_is_admin',
41
    'assert_user_is_system_admin',
42
    'assert_user_is_admin_or_operating_on_own_resource',
43
    'assert_user_has_permission',
44
    'assert_user_has_resource_db_permission',
45
46
    'assert_user_is_admin_if_user_query_param_is_provided',
47
48
    'assert_user_has_rule_trigger_and_action_permission',
49
50
    'user_is_admin',
51
    'user_is_system_admin',
52
    'user_has_permission',
53
    'user_has_resource_api_permission',
54
    'user_has_resource_db_permission',
55
    'user_has_role',
56
57
    'get_user_db_from_request'
58
]
59
60
61
def assert_user_is_admin(user_db):
62
    """
63
    Assert that the currently logged in user is an administrator.
64
65
    If the user is not an administrator, an exception is thrown.
66
    """
67
    is_admin = user_is_admin(user_db=user_db)
68
69
    if not is_admin:
70
        raise AccessDeniedError(message='Administrator access required',
71
                                user_db=user_db)
72
73
74
def assert_user_is_system_admin(user_db):
75
    """
76
    Assert that the currently logged in user is a system administrator.
77
78
    If the user is not a system administrator, an exception is thrown.
79
    """
80
    is_system_admin = user_is_system_admin(user_db=user_db)
81
82
    if not is_system_admin:
83
        raise AccessDeniedError(message='System Administrator access required',
84
                                user_db=user_db)
85
86
87
def assert_user_is_admin_or_operating_on_own_resource(user_db, user=None):
88
    """
89
    Assert that the currently logged in user is an administrator or operating on a resource which
90
    belongs to that user.
91
    """
92
    if not cfg.CONF.rbac.enable:
93
        return True
94
95
    is_admin = user_is_admin(user_db=user_db)
96
    is_self = user is not None and (user_db.name == user)
97
98
    if not is_admin and not is_self:
99
        raise AccessDeniedError(message='Administrator or self access required',
100
                                user_db=user_db)
101
102
103
def assert_user_has_permission(user_db, permission_type):
104
    """
105
    Check that currently logged-in user has specified permission.
106
107
    If user doesn't have a required permission, AccessDeniedError s thrown.
108
    """
109
    has_permission = user_has_permission(user_db=user_db, permission_type=permission_type)
110
111
    if not has_permission:
112
        raise ResourceTypeAccessDeniedError(user_db=user_db, permission_type=permission_type)
113
114
115
def assert_user_has_resource_api_permission(user_db, resource_api, permission_type):
116
    """
117
    Check that currently logged-in user has specified permission for the resource which is to be
118
    created.
119
    """
120
    has_permission = user_has_resource_api_permission(user_db=user_db,
121
                                                      resource_api=resource_api,
122
                                                      permission_type=permission_type)
123
124
    if not has_permission:
125
        # TODO: Refactor exception
126
        raise ResourceAccessDeniedError(user_db=user_db, resource_db=resource_api,
127
                                        permission_type=permission_type)
128
129
130
def assert_user_has_resource_db_permission(user_db, resource_db, permission_type):
131
    """
132
    Check that currently logged-in user has specified permission on the provied resource.
133
134
    If user doesn't have a required permission, AccessDeniedError is thrown.
135
    """
136
    has_permission = user_has_resource_db_permission(user_db=user_db,
137
                                                     resource_db=resource_db,
138
                                                     permission_type=permission_type)
139
140
    if not has_permission:
141
        raise ResourceAccessDeniedError(user_db=user_db, resource_db=resource_db,
142
                                        permission_type=permission_type)
143
144
145
def user_has_rule_trigger_permission(user_db, trigger):
146
    """
147
    Check that the currently logged-in has necessary permissions on the trigger used / referenced
148
    inside the rule.
149
    """
150
    if not cfg.CONF.rbac.enable:
151
        return True
152
153
    rules_resolver = resolvers.get_resolver_for_resource_type(ResourceType.RULE)
154
    has_trigger_permission = rules_resolver.user_has_trigger_permission(user_db=user_db,
155
                                                                        trigger=trigger)
156
157
    if has_trigger_permission:
158
        return True
159
160
    return False
161
162
163
def user_has_rule_action_permission(user_db, action_ref):
164
    """
165
    Check that the currently logged-in has necessary permissions on the action used / referenced
166
    inside the rule.
167
168
    Note: Rules can reference actions which don't yet exist in the system.
169
    """
170
    if not cfg.CONF.rbac.enable:
171
        return True
172
173
    action_db = action_utils.get_action_by_ref(ref=action_ref)
174
175
    if not action_db:
176
        # We allow rules to be created for actions which don't yet exist in the
177
        # system
178
        ref = ResourceReference.from_string_reference(ref=action_ref)
179
        action_db = ActionDB(pack=ref.pack, name=ref.name, ref=action_ref)
180
181
    action_resolver = resolvers.get_resolver_for_resource_type(ResourceType.ACTION)
182
    has_action_permission = action_resolver.user_has_resource_db_permission(
183
        user_db=user_db, resource_db=action_db, permission_type=PermissionType.ACTION_EXECUTE)
184
185
    if has_action_permission:
186
        return True
187
188
    return False
189
190
191
def assert_user_has_rule_trigger_and_action_permission(user_db, rule_api):
192
    """
193
    Check that the currently logged-in has necessary permissions on trhe trigger and action
194
    used / referenced inside the rule.
195
    """
196
197
    if not cfg.CONF.rbac.enable:
198
        return True
199
200
    trigger = rule_api.trigger
201
    action = rule_api.action
202
    trigger_type = trigger['type']
203
    action_ref = action['ref']
204
205
    # Check that user has access to the specified trigger - right now we only check for
206
    # webhook permissions
207
    has_trigger_permission = user_has_rule_trigger_permission(user_db=user_db,
208
                                                              trigger=trigger)
209
210
    if not has_trigger_permission:
211
        msg = ('User "%s" doesn\'t have required permission (%s) to use trigger %s' %
212
               (user_db.name, PermissionType.WEBHOOK_CREATE, trigger_type))
213
        raise AccessDeniedError(message=msg, user_db=user_db)
214
215
    # Check that user has access to the specified action
216
    has_action_permission = user_has_rule_action_permission(user_db=user_db,
217
                                                            action_ref=action_ref)
218
219
    if not has_action_permission:
220
        msg = ('User "%s" doesn\'t have required (%s) permission to use action %s' %
221
               (user_db.name, PermissionType.ACTION_EXECUTE, action_ref))
222
        raise AccessDeniedError(message=msg, user_db=user_db)
223
224
    return True
225
226
227
def assert_user_is_admin_if_user_query_param_is_provided(user_db, user):
228
    """
229
    Function which asserts that the request user is administator if "user" query parameter is
230
    provided and doesn't match the current user.
231
    """
232
    is_admin = user_is_admin(user_db=user_db)
233
234
    if user != user_db.name and not is_admin:
235
        msg = '"user" attribute can only be provided by admins'
236
        raise AccessDeniedError(message=msg, user_db=user_db)
237
238
239
def user_is_admin(user_db):
240
    """
241
    Return True if the provided user has admin role (either system admin or admin), false
242
    otherwise.
243
244
    :param user_db: User object to check for.
245
    :type user_db: :class:`UserDB`
246
247
    :rtype: ``bool``
248
    """
249
    is_system_admin = user_is_system_admin(user_db=user_db)
250
    if is_system_admin:
251
        return True
252
253
    is_admin = user_has_role(user_db=user_db, role=SystemRole.ADMIN)
254
    if is_admin:
255
        return True
256
257
    return False
258
259
260
def user_is_system_admin(user_db):
261
    """
262
    Return True if the provided user has system admin rule, false otherwise.
263
264
    :param user_db: User object to check for.
265
    :type user_db: :class:`UserDB`
266
267
    :rtype: ``bool``
268
    """
269
    return user_has_role(user_db=user_db, role=SystemRole.SYSTEM_ADMIN)
270
271
272
def user_has_role(user_db, role):
273
    """
274
    :param user: User object to check for.
275
    :type user: :class:`UserDB`
276
277
    :param role: Role name to check for.
278
    :type role: ``str``
279
280
    :rtype: ``bool``
281
    """
282
    assert isinstance(role, six.string_types)
283
284
    if not cfg.CONF.rbac.enable:
285
        return True
286
287
    user_role_dbs = rbac_services.get_roles_for_user(user_db=user_db)
288
    user_role_names = [role_db.name for role_db in user_role_dbs]
289
290
    return role in user_role_names
291
292
293
def user_has_permission(user_db, permission_type):
294
    """
295
    Check that the provided user has specified permission.
296
    """
297
    if not cfg.CONF.rbac.enable:
298
        return True
299
300
    # TODO Verify permission type for the provided resource type
301
    resolver = resolvers.get_resolver_for_permission_type(permission_type=permission_type)
302
    result = resolver.user_has_permission(user_db=user_db, permission_type=permission_type)
303
    return result
304
305
306
def user_has_resource_api_permission(user_db, resource_api, permission_type):
307
    """
308
    Check that the provided user has specified permission on the provided resource API.
309
    """
310
    if not cfg.CONF.rbac.enable:
311
        return True
312
313
    # TODO Verify permission type for the provided resource type
314
    resolver = resolvers.get_resolver_for_permission_type(permission_type=permission_type)
315
    result = resolver.user_has_resource_api_permission(user_db=user_db, resource_api=resource_api,
316
                                                       permission_type=permission_type)
317
    return result
318
319
320
def user_has_resource_db_permission(user_db, resource_db, permission_type):
321
    """
322
    Check that the provided user has specified permission on the provided resource.
323
    """
324
    if not cfg.CONF.rbac.enable:
325
        return True
326
327
    # TODO Verify permission type for the provided resource type
328
    resolver = resolvers.get_resolver_for_permission_type(permission_type=permission_type)
329
    result = resolver.user_has_resource_db_permission(user_db=user_db, resource_db=resource_db,
330
                                                      permission_type=permission_type)
331
    return result
332
333
334
def get_user_db_from_request(request):
335
    """
336
    Retrieve UserDB object from the provided request.
337
    """
338
    auth_context = request.context.get('auth', {})
339
340
    if not auth_context:
341
        return None
342
343
    user_db = auth_context.get('user', None)
344
    return user_db
345