Passed
Push — master ( c6514d...e923b9 )
by
unknown
03:33
created

assign_role_to_user()   B

Complexity

Conditions 1

Size

Total Lines 26

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 26
rs 8.8571
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from mongoengine.queryset.visitor import Q
17
18
from st2common.rbac.types import PermissionType
19
from st2common.rbac.types import ResourceType
20
from st2common.rbac.types import SystemRole
21
from st2common.persistence.rbac import Role
22
from st2common.persistence.rbac import UserRoleAssignment
23
from st2common.persistence.rbac import PermissionGrant
24
from st2common.persistence.rbac import GroupToRoleMapping
25
from st2common.models.db.rbac import RoleDB
26
from st2common.models.db.rbac import UserRoleAssignmentDB
27
from st2common.models.db.rbac import PermissionGrantDB
28
from st2common.models.db.rbac import GroupToRoleMappingDB
29
30
31
__all__ = [
32
    'get_all_roles',
33
    'get_system_roles',
34
    'get_roles_for_user',
35
    'get_all_role_assignments',
36
    'get_role_assignments_for_user',
37
    'get_role_by_name',
38
39
    'create_role',
40
    'delete_role',
41
42
    'assign_role_to_user',
43
    'revoke_role_from_user',
44
45
    'get_all_permission_grants_for_user',
46
    'create_permission_grant',
47
    'create_permission_grant_for_resource_db',
48
    'remove_permission_grant_for_resource_db',
49
50
    'get_all_group_to_role_maps',
51
    'create_group_to_role_map',
52
53
    'validate_roles_exists'
54
]
55
56
57
def get_all_roles(exclude_system=False):
58
    """
59
    Retrieve all the available roles.
60
61
    :param exclude_system: True to exclude system roles.
62
    :type exclude_system: ``bool``
63
64
    :rtype: ``list`` of :class:`RoleDB`
65
    """
66
    if exclude_system:
67
        result = Role.query(system=False)
68
    else:
69
        result = Role.get_all()
70
71
    return result
72
73
74
def get_system_roles():
75
    """
76
    Retrieve all the available system roles.
77
78
    :rtype: ``list`` of :class:`RoleDB`
79
    """
80
    result = Role.query(system=True)
81
    return result
82
83
84
def get_roles_for_user(user_db, include_remote=True):
85
    """
86
    Retrieve all the roles assigned to the provided user.
87
88
    :param user_db: User to retrieve the roles for.
89
    :type user_db: :class:`UserDB`
90
91
    :param include_remote: True to also include remote role assignments.
92
    :type include_remote: ``bool``
93
94
    :rtype: ``list`` of :class:`RoleDB`
95
    """
96
    if include_remote:
97
        queryset = UserRoleAssignment.query(user=user_db.name)
98
    else:
99
        # when upgrading from pre v2.3.0 when this field didn't exist yet
100
        # Note: We also include None for pre v2.3 when this field didn't exist yet
101
        queryset_filter = (Q(user=user_db.name) &
102
                           (Q(is_remote=False) | Q(is_remote__exists=False)))
103
        queryset = UserRoleAssignmentDB.objects(queryset_filter)
104
105
    role_names = queryset.only('role').scalar('role')
106
    result = Role.query(name__in=role_names)
107
    return result
108
109
110
def get_all_role_assignments(include_remote=True):
111
    """
112
    Retrieve all the UserRoleAssignmentDB objects.
113
114
    :param include_remote: True to also include remote role assignments.
115
    :type include_remote: ``bool``
116
117
    :rtype: ``list`` of :class:`UserRoleAssignmentDB`
118
    """
119
    if include_remote:
120
        result = UserRoleAssignment.query()
121
    else:
122
        # Note: We also include documents with no "is_remote" field so it also works correctly
123
        # when upgrading from pre v2.3.0 when this field didn't exist yet
124
        queryset_filter = (Q(is_remote=False) | Q(is_remote__exists=False))
125
        result = UserRoleAssignmentDB.objects(queryset_filter)
126
127
    return result
128
129
130
def get_role_assignments_for_user(user_db, include_remote=True):
131
    """
132
    Retrieve all the UserRoleAssignmentDB objects for a particular user.
133
134
    :param user_db: User to retrieve the role assignments for.
135
    :type user_db: :class:`UserDB`
136
137
    :param include_remote: True to also include remote role assignments.
138
    :type include_remote: ``bool``
139
140
    :rtype: ``list`` of :class:`UserRoleAssignmentDB`
141
    """
142
    if include_remote:
143
        result = UserRoleAssignment.query(user=user_db.name)
144
    else:
145
        # Note: We also include documents with no "is_remote" field so it also works correctly
146
        # when upgrading from pre v2.3.0 when this field didn't exist yet
147
        queryset_filter = (Q(user=user_db.name) &
148
                           (Q(is_remote=False) | Q(is_remote__exists=False)))
149
        result = UserRoleAssignmentDB.objects(queryset_filter)
150
151
    return result
152
153
154
def get_role_by_name(name):
155
    """
156
    Retrieve role by name.
157
158
    :rtype: ``list`` of :class:`RoleDB`
159
    """
160
    result = Role.get(name=name)
161
    return result
162
163
164
def create_role(name, description=None):
165
    """
166
    Create a new role.
167
    """
168
    if name in SystemRole.get_valid_values():
169
        raise ValueError('"%s" role name is blacklisted' % (name))
170
171
    role_db = RoleDB(name=name, description=description)
172
    role_db = Role.add_or_update(role_db)
173
    return role_db
174
175
176
def delete_role(name):
177
    """"
178
    Delete role with the provided name.
179
    """
180
    if name in SystemRole.get_valid_values():
181
        raise ValueError('System roles can\'t be deleted')
182
183
    role_db = Role.get(name=name)
184
    result = Role.delete(role_db)
185
    return result
186
187
188
def assign_role_to_user(role_db, user_db, description=None, is_remote=False, source=None):
189
    """
190
    Assign role to a user.
191
192
    :param role_db: Role to assign.
193
    :type role_db: :class:`RoleDB`
194
195
    :param user_db: User to assign the role to.
196
    :type user_db: :class:`UserDB`
197
198
    :param description: Optional assingment description.
199
    :type description: ``str``
200
201
    :param include_remote: True if this a remote assignment.
202
    :type include_remote: ``bool``
203
204
    :param source: Source from where this assignment comes from. For example, path of a file if
205
                   it's a local assignment or mapping or "API".
206
    :type source: ``str``
207
    """
208
    role_assignment_db = UserRoleAssignmentDB(user=user_db.name, role=role_db.name, source=source,
209
                                              description=description, is_remote=is_remote)
210
211
    role_assignment_db = UserRoleAssignment.add_or_update(role_assignment_db)
212
213
    return role_assignment_db
214
215
216
def revoke_role_from_user(role_db, user_db):
217
    """
218
    Revoke role from a user.
219
220
    :param role_db: Role to revoke.
221
    :type role_db: :class:`RoleDB`
222
223
    :param user_db: User to revoke the role from.
224
    :type user_db: :class:`UserDB`
225
    """
226
    role_assignment_dbs = UserRoleAssignment.query(user=user_db.name, role=role_db.name)
227
228
    for role_assignment_db in role_assignment_dbs:
229
        UserRoleAssignment.delete(role_assignment_db)
230
231
232
def get_all_permission_grants_for_user(user_db, resource_uid=None, resource_types=None,
233
                                       permission_types=None):
234
    """
235
    Retrieve all the permission grants for a particular user optionally filtering on:
236
237
    - Resource uid
238
    - Resource types
239
    - Permission types
240
241
    The result is a union of all the permission grants assigned to the roles which are assigned to
242
    the user.
243
244
    :rtype: ``list`` or :class:`PermissionGrantDB`
245
    """
246
    role_names = UserRoleAssignment.query(user=user_db.name).only('role').scalar('role')
247
    permission_grant_ids = Role.query(name__in=role_names).scalar('permission_grants')
248
    permission_grant_ids = sum(permission_grant_ids, [])
249
250
    permission_grants_filters = {}
251
    permission_grants_filters['id__in'] = permission_grant_ids
252
253
    if resource_uid:
254
        permission_grants_filters['resource_uid'] = resource_uid
255
256
    if resource_types:
257
        permission_grants_filters['resource_type__in'] = resource_types
258
259
    if permission_types:
260
        permission_grants_filters['permission_types__in'] = permission_types
261
262
    permission_grant_dbs = PermissionGrant.query(**permission_grants_filters)
263
    return permission_grant_dbs
264
265
266
def create_permission_grant_for_resource_db(role_db, resource_db, permission_types):
267
    """
268
    Create a new permission grant for a resource and add it to the provided role.
269
270
    :param role_db: Role to add the permission assignment to.
271
    :type role_db: :class:`RoleDB`
272
273
    :param resource_db: Resource to create the permission assignment for.
274
    :type resource_db: :class:`StormFoundationDB`
275
    """
276
    permission_types = _validate_permission_types(resource_db=resource_db,
277
                                                  permission_types=permission_types)
278
279
    resource_uid = resource_db.get_uid()
280
    resource_type = resource_db.get_resource_type()
281
282
    result = create_permission_grant(role_db=role_db, resource_uid=resource_uid,
283
                                     resource_type=resource_type,
284
                                     permission_types=permission_types)
285
    return result
286
287
288
def create_permission_grant(role_db, resource_uid, resource_type, permission_types):
289
    """
290
    Create a new permission grant and add it to the provided role.
291
292
    :param role_db: Role to add the permission assignment to.
293
    :type role_db: :class:`RoleDB`
294
    """
295
    # Create or update the PermissionGrantDB
296
    permission_grant_db = PermissionGrantDB(resource_uid=resource_uid,
297
                                            resource_type=resource_type,
298
                                            permission_types=permission_types)
299
    permission_grant_db = PermissionGrant.add_or_update(permission_grant_db)
300
301
    # Add assignment to the role
302
    role_db.update(push__permission_grants=str(permission_grant_db.id))
303
304
    return permission_grant_db
305
306
307
def remove_permission_grant_for_resource_db(role_db, resource_db, permission_types):
308
    """
309
    Remove a permission grant from a role.
310
311
    :param role_db: Role to remove the permission assignment from.
312
    :type role_db: :class:`RoleDB`
313
314
    :param resource_db: Resource to remove the permission assignment from.
315
    :type resource_db: :class:`StormFoundationDB`
316
    """
317
    permission_types = _validate_permission_types(resource_db=resource_db,
318
                                                  permission_types=permission_types)
319
    resource_uid = resource_db.get_uid()
320
    resource_type = resource_db.get_resource_type()
321
    permission_grant_db = PermissionGrant.get(resource_uid=resource_uid,
322
                                              resource_type=resource_type,
323
                                              permission_types=permission_types)
324
325
    # Remove assignment from a role
326
    role_db.update(pull__permission_grants=str(permission_grant_db.id))
327
328
    return permission_grant_db
329
330
331
def get_all_group_to_role_maps():
332
    result = GroupToRoleMapping.get_all()
333
    return result
334
335
336
def create_group_to_role_map(group, roles, description=None, enabled=True, source=None):
337
    group_to_role_map_db = GroupToRoleMappingDB(group=group,
338
                                                roles=roles,
339
                                                source=source,
340
                                                description=description,
341
                                                enabled=enabled)
342
343
    group_to_role_map_db = GroupToRoleMapping.add_or_update(group_to_role_map_db)
344
345
    return group_to_role_map_db
346
347
348
def validate_roles_exists(role_names):
349
    """
350
    Verify that the roles with the provided names exists in the system.
351
352
    :param role_name: Name of the role.
353
    :type role_name: ``str``
354
    """
355
    role_dbs = get_all_roles()
356
    existing_role_names = [role_db.name for role_db in role_dbs]
357
358
    for role_name in role_names:
359
        if role_name not in existing_role_names:
360
            raise ValueError('Role "%s" doesn\'t exist in the database' % (role_name))
361
362
363
def _validate_resource_type(resource_db):
364
    """
365
    Validate that the permissions can be manipulated for the provided resource type.
366
    """
367
    resource_type = resource_db.get_resource_type()
368
    valid_resource_types = ResourceType.get_valid_values()
369
370
    if resource_type not in valid_resource_types:
371
        raise ValueError('Permissions cannot be manipulated for a resource of type: %s' %
372
                         (resource_type))
373
374
    return resource_db
375
376
377
def _validate_permission_types(resource_db, permission_types):
378
    """
379
    Validate that the permission_types list only contains valid values for the
380
    provided resource.
381
    """
382
    resource_db = _validate_resource_type(resource_db=resource_db)
383
    resource_type = resource_db.get_resource_type()
384
    valid_permission_types = PermissionType.get_valid_permissions_for_resource_type(resource_type)
385
386
    for permission_type in permission_types:
387
        if permission_type not in valid_permission_types:
388
            raise ValueError('Invalid permission type: %s' % (permission_type))
389
390
    return permission_types
391