Passed
Push — main ( cf3570...95bc96 )
by Jochen
04:50
created

_index_permission_ids_by_role()   A

Complexity

Conditions 3

Size

Total Lines 17
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 3

Importance

Changes 0
Metric Value
cc 3
eloc 12
nop 2
dl 0
loc 17
rs 9.8
c 0
b 0
f 0
ccs 10
cts 10
cp 1
crap 3
1
"""
2
byceps.services.authorization.service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2021 Jochen Kupperschmidt
6
:License: Revised BSD (see `LICENSE` file for details)
7
"""
8
9 1
from __future__ import annotations
10 1
from collections import defaultdict
11 1
from typing import Iterable, Optional, Sequence
12
13 1
from sqlalchemy import select
14 1
from sqlalchemy.exc import IntegrityError
15
16 1
from ...database import db
17 1
from ...typing import UserID
18
19 1
from ..user import event_service as user_event_service
20
21 1
from .dbmodels import (
22
    Permission as DbPermission,
23
    Role as DbRole,
24
    RolePermission as DbRolePermission,
25
    UserRole as DbUserRole,
26
)
27 1
from .transfer.models import Permission, PermissionID, Role, RoleID
28
29
30 1
def create_permission(
31
    permission_id: PermissionID, title: str, *, ignore_if_exists: bool = False
32
) -> Permission:
33
    """Create a permission."""
34 1
    permission = DbPermission(permission_id, title)
35
36 1
    db.session.add(permission)
37
38 1
    if ignore_if_exists:
39 1
        _commit_ignoring_integrity_error()
40
    else:
41 1
        db.session.commit()
42
43 1
    return _db_entity_to_permission(permission)
44
45
46 1
def delete_permission(permission_id: PermissionID) -> None:
47
    """Delete a permission."""
48 1
    db.session.query(DbPermission) \
49
        .filter_by(id=permission_id) \
50
        .delete()
51
52 1
    db.session.commit()
53
54
55 1
def create_role(
56
    role_id: RoleID, title: str, *, ignore_if_exists: bool = False
57
) -> Role:
58
    """Create a role."""
59 1
    role = DbRole(role_id, title)
60
61 1
    db.session.add(role)
62
63 1
    if ignore_if_exists:
64 1
        _commit_ignoring_integrity_error()
65
    else:
66 1
        db.session.commit()
67
68 1
    return _db_entity_to_role(role)
69
70
71 1
def delete_role(role_id: RoleID) -> None:
72
    """Delete a role."""
73 1
    db.session.query(DbRolePermission) \
74
        .filter_by(role_id=role_id) \
75
        .delete()
76
77 1
    db.session.query(DbRole) \
78
        .filter_by(id=role_id) \
79
        .delete()
80
81 1
    db.session.commit()
82
83
84 1
def find_role(role_id: RoleID) -> Optional[Role]:
85
    """Return the role with that id, or `None` if not found."""
86 1
    role = db.session.query(DbRole).get(role_id)
87
88 1
    if role is None:
89
        return None
90
91 1
    return _db_entity_to_role(role)
92
93
94 1
def find_role_ids_for_user(user_id: UserID) -> set[RoleID]:
95
    """Return the IDs of the roles assigned to the user."""
96 1
    roles = db.session \
97
        .query(DbRole) \
98
        .join(DbUserRole) \
99
        .filter(DbUserRole.user_id == user_id) \
100
        .all()
101
102 1
    return {r.id for r in roles}
103
104
105 1
def find_user_ids_for_role(role_id: RoleID) -> set[UserID]:
106
    """Return the IDs of the users that have this role assigned."""
107 1
    rows = db.session \
108
        .query(DbUserRole.user_id) \
109
        .filter(DbUserRole.role_id == role_id) \
110
        .all()
111
112 1
    return {row[0] for row in rows}
113
114
115 1
def assign_permission_to_role(
116
    permission_id: PermissionID, role_id: RoleID
117
) -> None:
118
    """Assign the permission to the role."""
119 1
    role_permission = DbRolePermission(role_id, permission_id)
120
121 1
    db.session.add(role_permission)
122 1
    db.session.commit()
123
124
125 1
def deassign_permission_from_role(
126
    permission_id: PermissionID, role_id: RoleID
127
) -> None:
128
    """Dessign the permission from the role."""
129 1
    role_permission = db.session.query(DbRolePermission) \
130
        .get((role_id, permission_id))
131
132 1
    if role_permission is None:
133
        raise ValueError('Unknown role ID and/or permission ID.')
134
135 1
    db.session.delete(role_permission)
136 1
    db.session.commit()
137
138
139 1
def assign_role_to_user(
140
    role_id: RoleID, user_id: UserID, *, initiator_id: Optional[UserID] = None
141
) -> None:
142
    """Assign the role to the user."""
143 1
    if _is_role_assigned_to_user(role_id, user_id):
144
        # Role is already assigned to user. Nothing to do.
145 1
        return
146
147 1
    user_role = DbUserRole(user_id, role_id)
148 1
    db.session.add(user_role)
149
150 1
    event_data = {'role_id': str(role_id)}
151 1
    if initiator_id is not None:
152 1
        event_data['initiator_id'] = str(initiator_id)
153 1
    event = user_event_service.build_event('role-assigned', user_id, event_data)
154 1
    db.session.add(event)
155
156 1
    db.session.commit()
157
158
159 1
def deassign_role_from_user(
160
    role_id: RoleID, user_id: UserID, initiator_id: Optional[UserID] = None
161
) -> None:
162
    """Deassign the role from the user."""
163 1
    user_role = db.session.query(DbUserRole).get((user_id, role_id))
164
165 1
    if user_role is None:
166
        raise ValueError(
167
            f'Unknown role ID "{role_id}" and/or user ID "{user_id}".'
168
        )
169
170 1
    db.session.delete(user_role)
171
172 1
    event_data = {'role_id': str(role_id)}
173 1
    if initiator_id is not None:
174 1
        event_data['initiator_id'] = str(initiator_id)
175 1
    event = user_event_service.build_event(
176
        'role-deassigned', user_id, event_data
177
    )
178 1
    db.session.add(event)
179
180 1
    db.session.commit()
181
182
183 1
def deassign_all_roles_from_user(
184
    user_id: UserID, initiator_id: Optional[UserID] = None, commit: bool = True
185
) -> None:
186
    """Deassign all roles from the user."""
187 1
    table = DbUserRole.__table__
188 1
    delete_query = table.delete() \
189
        .where(table.c.user_id == user_id)
190 1
    db.session.execute(delete_query)
191
192 1
    if commit:
193 1
        db.session.commit()
194
195
196 1
def _is_role_assigned_to_user(role_id: RoleID, user_id: UserID) -> bool:
197
    """Determine if the role is assigned to the user or not."""
198 1
    subquery = db.session \
199
        .query(DbUserRole) \
200
        .filter_by(role_id=role_id) \
201
        .filter_by(user_id=user_id) \
202
        .exists()
203
204 1
    return db.session.query(subquery).scalar()
205
206
207 1
def get_permission_ids_for_user(user_id: UserID) -> set[PermissionID]:
208
    """Return the IDs of all permissions the user has through the roles
209
    assigned to it.
210
    """
211 1
    role_permissions = db.session \
212
        .query(DbRolePermission) \
213
        .join(DbRole) \
214
        .join(DbUserRole) \
215
        .filter(DbUserRole.user_id == user_id) \
216
        .all()
217
218 1
    return {rp.permission_id for rp in role_permissions}
219
220
221 1
def get_assigned_roles_for_permissions() -> dict[PermissionID, set[RoleID]]:
222
    """Return the IDs of roles that have permissions assigned, indexed
223
    by permission ID.
224
    """
225 1
    role_ids_by_permission_id = defaultdict(set)
226
227 1
    rows = db.session.execute(
228
        select(DbRolePermission.permission_id, DbRolePermission.role_id)
229
    ).all()
230
231 1
    permission_ids_and_role_ids = [
232
        (PermissionID(permission_id), RoleID(role_id))
233
        for permission_id, role_id in rows
234
    ]
235
236 1
    for permission_id, role_id in permission_ids_and_role_ids:
237 1
        role_ids_by_permission_id[permission_id].add(role_id)
238
239 1
    return dict(role_ids_by_permission_id)
240
241
242 1
def get_all_roles_with_titles() -> Sequence[DbRole]:
243
    """Return all roles, with titles."""
244 1
    return db.session \
245
        .query(DbRole) \
246
        .options(
247
            db.undefer(DbRole.title),
248
            db.joinedload(DbRole.user_roles)
249
                .joinedload(DbUserRole.user)
250
        ) \
251
        .all()
252
253
254 1
def get_permission_ids_by_role() -> dict[Role, frozenset[PermissionID]]:
255
    """Return all roles with their assigned permission IDs.
256
257
    Role titles are undeferred to avoid lots of additional queries.
258
    """
259 1
    db_roles = db.session.execute(
260
        select(DbRole)
261
        .options(db.undefer(DbRole.title))
262
    ).scalars().unique().all()
263
264 1
    role_ids_and_permission_ids = db.session.execute(
265
        select(DbRolePermission.role_id, DbRolePermission.permission_id)
266
    ).all()
267
268 1
    return _index_permission_ids_by_role(role_ids_and_permission_ids, db_roles)
269
270
271 1
def get_permission_ids_by_role_for_user(
272
    user_id: UserID,
273
) -> dict[Role, frozenset[PermissionID]]:
274
    """Return permission IDs grouped by their respective roles for that
275
    user.
276
277
    Role titles are undeferred to avoid lots of additional queries.
278
    """
279 1
    db_roles = db.session.execute(
280
        select(DbRole)
281
        .options(db.undefer(DbRole.title))
282
        .join(DbUserRole)
283
        .filter(DbUserRole.user_id == user_id)
284
    ).scalars().unique().all()
285
286 1
    role_ids_and_permission_ids = db.session.execute(
287
        select(DbRolePermission.role_id, DbRolePermission.permission_id)
288
        .join(DbRole)
289
        .join(DbUserRole)
290
        .filter(DbUserRole.user_id == user_id)
291
    ).all()
292
293 1
    return _index_permission_ids_by_role(role_ids_and_permission_ids, db_roles)
294
295
296 1
def _index_permission_ids_by_role(
297
    role_ids_and_permission_ids: Iterable[tuple[RoleID, PermissionID]],
298
    db_roles: Iterable[DbRole],
299
) -> dict[Role, frozenset[PermissionID]]:
300
    """Index permission IDs by role."""
301 1
    permission_ids_by_role_id = defaultdict(set)
302 1
    for role_id, permission_id in role_ids_and_permission_ids:
303 1
        permission_ids_by_role_id[role_id].add(permission_id)
304
305 1
    permission_ids_by_role = {}
306
307 1
    for db_role in db_roles:
308 1
        role = _db_entity_to_role(db_role)
309 1
        permission_ids = frozenset(permission_ids_by_role_id[role.id])
310 1
        permission_ids_by_role[role] = permission_ids
311
312 1
    return permission_ids_by_role
313
314
315 1
def get_permission_ids_for_role(role_id: RoleID) -> set[PermissionID]:
316
    """Return the permission IDs assigned to the role."""
317 1
    permission_ids = db.session.execute(
318
        select(DbRolePermission.permission_id)
319
        .filter_by(role_id=role_id)
320
    ).scalars().all()
321
322 1
    return {PermissionID(permission_id) for permission_id in permission_ids}
323
324
325 1
def _commit_ignoring_integrity_error() -> None:
326 1
    try:
327 1
        db.session.commit()
328
    except IntegrityError:
329
        db.session.rollback()
330
331
332 1
def _db_entity_to_permission(permission: DbPermission) -> Permission:
333 1
    return Permission(
334
        permission.id,
335
        permission.title,
336
    )
337
338
339 1
def _db_entity_to_role(role: DbRole) -> Role:
340 1
    return Role(
341
        role.id,
342
        role.title,
343
    )
344