Completed
Push — main ( 4a33dd...b7acb4 )
by Jochen
03:55
created

_commit_ignoring_integrity_error()   A

Complexity

Conditions 2

Size

Total Lines 5
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 5
nop 0
dl 0
loc 5
ccs 5
cts 5
cp 1
crap 2
rs 10
c 0
b 0
f 0
1
"""
2
byceps.services.authorization.service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2020 Jochen Kupperschmidt
6
:License: Modified BSD, see LICENSE for details.
7
"""
8
9 1
from typing import Dict, List, Optional, Sequence, Set
10
11 1
from sqlalchemy.exc import IntegrityError
12
13 1
from ...database import db
14 1
from ...typing import UserID
15
16 1
from ..user import event_service as user_event_service
17
18 1
from .models import (
19
    Permission as DbPermission,
20
    Role as DbRole,
21
    RolePermission as DbRolePermission,
22
    UserRole as DbUserRole,
23
)
24 1
from .transfer.models import Permission, PermissionID, Role, RoleID
25
26
27 1
def create_permission(
28
    permission_id: PermissionID, title: str, *, ignore_if_exists: bool = False
29
) -> Permission:
30
    """Create a permission."""
31 1
    permission = DbPermission(permission_id, title)
32
33 1
    db.session.add(permission)
34
35 1
    if ignore_if_exists:
36 1
        _commit_ignoring_integrity_error()
37
    else:
38 1
        db.session.commit()
39
40 1
    return _db_entity_to_permission(permission)
41
42
43 1
def delete_permission(permission_id: PermissionID) -> None:
44
    """Delete a permission."""
45 1
    db.session.query(DbPermission) \
46
        .filter_by(id=permission_id) \
47
        .delete()
48
49 1
    db.session.commit()
50
51
52 1
def create_role(
53
    role_id: RoleID, title: str, *, ignore_if_exists: bool = False
54
) -> Role:
55
    """Create a role."""
56 1
    role = DbRole(role_id, title)
57
58 1
    db.session.add(role)
59
60 1
    if ignore_if_exists:
61 1
        _commit_ignoring_integrity_error()
62
    else:
63 1
        db.session.commit()
64
65 1
    return _db_entity_to_role(role)
66
67
68 1
def delete_role(role_id: RoleID) -> None:
69
    """Delete a role."""
70 1
    db.session.query(DbRolePermission) \
71
        .filter_by(role_id=role_id) \
72
        .delete()
73
74 1
    db.session.query(DbRole) \
75
        .filter_by(id=role_id) \
76
        .delete()
77
78 1
    db.session.commit()
79
80
81 1
def find_role(role_id: RoleID) -> Optional[Role]:
82
    """Return the role with that id, or `None` if not found."""
83 1
    role = DbRole.query.get(role_id)
84
85 1
    if role is None:
86
        return None
87
88 1
    return _db_entity_to_role(role)
89
90
91 1
def find_role_ids_for_user(user_id: UserID) -> Set[RoleID]:
92
    """Return the IDs of the roles assigned to the user."""
93 1
    roles = DbRole.query \
94
        .join(DbUserRole) \
95
        .filter(DbUserRole.user_id == user_id) \
96
        .all()
97
98 1
    return {r.id for r in roles}
99
100
101 1
def find_user_ids_for_role(role_id: RoleID) -> Set[UserID]:
102
    """Return the IDs of the users that have this role assigned."""
103
    rows = db.session \
104
        .query(DbUserRole.user_id) \
105
        .filter(DbUserRole.role_id == role_id) \
106
        .all()
107
108
    return {row[0] for row in rows}
109
110
111 1
def assign_permission_to_role(
112
    permission_id: PermissionID, role_id: RoleID
113
) -> None:
114
    """Assign the permission to the role."""
115 1
    role_permission = DbRolePermission(role_id, permission_id)
116
117 1
    db.session.add(role_permission)
118 1
    db.session.commit()
119
120
121 1
def deassign_permission_from_role(
122
    permission_id: PermissionID, role_id: RoleID
123
) -> None:
124
    """Dessign the permission from the role."""
125 1
    role_permission = DbRolePermission.query.get((role_id, permission_id))
126
127 1
    if role_permission is None:
128
        raise ValueError('Unknown role ID and/or permission ID.')
129
130 1
    db.session.delete(role_permission)
131 1
    db.session.commit()
132
133
134 1
def assign_role_to_user(
135
    role_id: RoleID, user_id: UserID, *, initiator_id: Optional[UserID] = None
136
) -> None:
137
    """Assign the role to the user."""
138 1
    if _is_role_assigned_to_user(role_id, user_id):
139
        # Role is already assigned to user. Nothing to do.
140 1
        return
141
142 1
    user_role = DbUserRole(user_id, role_id)
143 1
    db.session.add(user_role)
144
145 1
    event_data = {'role_id': str(role_id)}
146 1
    if initiator_id is not None:
147 1
        event_data['initiator_id'] = str(initiator_id)
148 1
    event = user_event_service.build_event('role-assigned', user_id, event_data)
149 1
    db.session.add(event)
150
151 1
    db.session.commit()
152
153
154 1
def deassign_role_from_user(
155
    role_id: RoleID, user_id: UserID, initiator_id: Optional[UserID] = None
156
) -> None:
157
    """Deassign the role from the user."""
158 1
    user_role = DbUserRole.query.get((user_id, role_id))
159
160 1
    if user_role is None:
161
        raise ValueError('Unknown user ID and/or role ID.')
162
163 1
    db.session.delete(user_role)
164
165 1
    event_data = {'role_id': str(role_id)}
166 1
    if initiator_id is not None:
167 1
        event_data['initiator_id'] = str(initiator_id)
168 1
    event = user_event_service.build_event(
169
        'role-deassigned', user_id, event_data
170
    )
171 1
    db.session.add(event)
172
173 1
    db.session.commit()
174
175
176 1
def deassign_all_roles_from_user(
177
    user_id: UserID, initiator_id: Optional[UserID] = None, commit=True
178
) -> None:
179
    """Deassign all roles from the user."""
180 1
    table = DbUserRole.__table__
181 1
    delete_query = table.delete() \
182
        .where(table.c.user_id == user_id)
183 1
    db.session.execute(delete_query)
184
185 1
    if commit:
186 1
        db.session.commit()
187
188
189 1
def _is_role_assigned_to_user(role_id: RoleID, user_id: UserID) -> bool:
190
    """Determine if the role is assigned to the user or not."""
191 1
    subquery = DbUserRole.query \
192
        .filter_by(role_id=role_id) \
193
        .filter_by(user_id=user_id) \
194
        .exists()
195
196 1
    return db.session.query(subquery).scalar()
197
198
199 1
def get_permission_ids_for_user(user_id: UserID) -> Set[PermissionID]:
200
    """Return the IDs of all permissions the user has through the roles
201
    assigned to it.
202
    """
203 1
    role_permissions = DbRolePermission.query \
204
        .join(DbRole) \
205
        .join(DbUserRole) \
206
        .filter_by(user_id=user_id) \
207
        .all()
208
209 1
    return {rp.permission_id for rp in role_permissions}
210
211
212 1
def get_all_permissions_with_titles() -> Sequence[DbPermission]:
213
    """Return all permissions, with titles."""
214
    return DbPermission.query \
215
        .options(
216
            db.undefer('title'),
217
            db.joinedload('role_permissions')
218
        ) \
219
        .all()
220
221
222 1
def get_all_roles_with_titles() -> Sequence[DbRole]:
223
    """Return all roles, with titles."""
224
    return DbRole.query \
225
        .options(
226
            db.undefer('title'),
227
            db.joinedload('user_roles').joinedload('user')
228
        ) \
229
        .all()
230
231
232 1
def get_permissions_by_roles_with_titles() -> Dict[Role, Set[Permission]]:
233
    """Return all roles with their assigned permissions.
234
235
    Titles are undeferred to avoid lots of additional queries.
236
    """
237
    roles = DbRole.query \
238
        .options(
239
            db.undefer('title'),
240
        ) \
241
        .all()
242
243
    permissions = DbPermission.query \
244
        .options(
245
            db.undefer('title'),
246
            db.joinedload('role_permissions').joinedload('role')
247
        ) \
248
        .all()
249
250
    return _index_permissions_by_role(permissions, roles)
251
252
253 1
def get_permissions_by_roles_for_user_with_titles(
254
    user_id: UserID,
255
) -> Dict[Role, Set[Permission]]:
256
    """Return permissions grouped by their respective roles for that user.
257
258
    Titles are undeferred to avoid lots of additional queries.
259
    """
260
    roles = DbRole.query \
261
        .options(
262
            db.undefer('title'),
263
        ) \
264
        .join(DbUserRole) \
265
        .filter(DbUserRole.user_id == user_id) \
266
        .all()
267
268
    role_ids = {r.id for r in roles}
269
270
    if role_ids:
271
        permissions = DbPermission.query \
272
            .options(
273
                db.undefer('title'),
274
                db.joinedload('role_permissions').joinedload('role')
275
            ) \
276
            .join(DbRolePermission) \
277
            .join(DbRole) \
278
            .filter(DbRole.id.in_(role_ids)) \
279
            .all()
280
    else:
281
        permissions = []
282
283
    return _index_permissions_by_role(permissions, roles)
284
285
286 1
def _index_permissions_by_role(
287
    permissions: List[DbPermission], roles: List[DbRole]
288
) -> Dict[Role, Set[Permission]]:
289
    permissions_by_role: Dict[DbRole, Set[DbPermission]] = {
290
        role: set() for role in roles
291
    }
292
293
    for permission in permissions:
294
        for role in permission.roles:
295
            if role in permissions_by_role:
296
                permissions_by_role[role].add(permission)
297
298
    # Convert database entities to transfer objects.
299
    return {
300
        _db_entity_to_role(role): {
301
            _db_entity_to_permission(permission) for permission in permissions
302
        }
303
        for role, permissions in permissions_by_role.items()
304
    }
305
306
307 1
def get_permissions_with_title_for_role(
308
    role_id: RoleID,
309
) -> Sequence[Permission]:
310
    """Return the permissions assigned to the role."""
311 1
    permissions = DbPermission.query \
312
        .options(
313
            db.undefer('title')
314
        ) \
315
        .join(DbRolePermission) \
316
        .filter(DbRolePermission.role_id == role_id) \
317
        .all()
318
319 1
    return [_db_entity_to_permission(permission) for permission in permissions]
320
321
322 1
def _commit_ignoring_integrity_error() -> None:
323 1
    try:
324 1
        db.session.commit()
325 1
    except IntegrityError:
326 1
        db.session.rollback()
327
328
329 1
def _db_entity_to_permission(permission: DbPermission) -> Permission:
330 1
    return Permission(
331
        permission.id,
332
        permission.title,
333
    )
334
335
336 1
def _db_entity_to_role(role: DbRole) -> Role:
337 1
    return Role(
338
        role.id,
339
        role.title,
340
    )
341