Passed
Push — main ( 95bc96...0b198e )
by Jochen
04:40
created

get_all_roles_with_permissions_and_users()   A

Complexity

Conditions 1

Size

Total Lines 26
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 18
nop 0
dl 0
loc 26
ccs 3
cts 3
cp 1
crap 1
rs 9.5
c 0
b 0
f 0
1
"""
2
byceps.services.authorization.service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2021 Jochen Kupperschmidt
6
:License: Revised BSD (see `LICENSE` file for details)
7
"""
8
9 1
from __future__ import annotations
10 1
from collections import defaultdict
11 1
from typing import Iterable, Optional
12
13 1
from sqlalchemy import select
14 1
from sqlalchemy.exc import IntegrityError
15
16 1
from ...database import db
17 1
from ...typing import UserID
18
19 1
from ..user import event_service as user_event_service, service as user_service
20 1
from ..user.transfer.models import User
21
22 1
from .dbmodels import (
23
    Role as DbRole,
24
    RolePermission as DbRolePermission,
25
    UserRole as DbUserRole,
26
)
27 1
from .transfer.models import PermissionID, Role, RoleID
28
29
30 1
def create_role(
31
    role_id: RoleID, title: str, *, ignore_if_exists: bool = False
32
) -> Role:
33
    """Create a role."""
34 1
    role = DbRole(role_id, title)
35
36 1
    db.session.add(role)
37
38 1
    if ignore_if_exists:
39 1
        _commit_ignoring_integrity_error()
40
    else:
41 1
        db.session.commit()
42
43 1
    return _db_entity_to_role(role)
44
45
46 1
def delete_role(role_id: RoleID) -> None:
47
    """Delete a role."""
48 1
    db.session.query(DbRolePermission) \
49
        .filter_by(role_id=role_id) \
50
        .delete()
51
52 1
    db.session.query(DbRole) \
53
        .filter_by(id=role_id) \
54
        .delete()
55
56 1
    db.session.commit()
57
58
59 1
def find_role(role_id: RoleID) -> Optional[Role]:
60
    """Return the role with that id, or `None` if not found."""
61 1
    role = db.session.query(DbRole).get(role_id)
62
63 1
    if role is None:
64
        return None
65
66 1
    return _db_entity_to_role(role)
67
68
69 1
def find_role_ids_for_user(user_id: UserID) -> set[RoleID]:
70
    """Return the IDs of the roles assigned to the user."""
71 1
    roles = db.session \
72
        .query(DbRole) \
73
        .join(DbUserRole) \
74
        .filter(DbUserRole.user_id == user_id) \
75
        .all()
76
77 1
    return {r.id for r in roles}
78
79
80 1
def find_user_ids_for_role(role_id: RoleID) -> set[UserID]:
81
    """Return the IDs of the users that have this role assigned."""
82 1
    rows = db.session \
83
        .query(DbUserRole.user_id) \
84
        .filter(DbUserRole.role_id == role_id) \
85
        .all()
86
87 1
    return {row[0] for row in rows}
88
89
90 1
def assign_permission_to_role(
91
    permission_id: PermissionID, role_id: RoleID
92
) -> None:
93
    """Assign the permission to the role."""
94 1
    role_permission = DbRolePermission(role_id, permission_id)
95
96 1
    db.session.add(role_permission)
97 1
    db.session.commit()
98
99
100 1
def deassign_permission_from_role(
101
    permission_id: PermissionID, role_id: RoleID
102
) -> None:
103
    """Dessign the permission from the role."""
104 1
    role_permission = db.session.query(DbRolePermission) \
105
        .get((role_id, permission_id))
106
107 1
    if role_permission is None:
108
        raise ValueError('Unknown role ID and/or permission ID.')
109
110 1
    db.session.delete(role_permission)
111 1
    db.session.commit()
112
113
114 1
def assign_role_to_user(
115
    role_id: RoleID, user_id: UserID, *, initiator_id: Optional[UserID] = None
116
) -> None:
117
    """Assign the role to the user."""
118 1
    if _is_role_assigned_to_user(role_id, user_id):
119
        # Role is already assigned to user. Nothing to do.
120 1
        return
121
122 1
    user_role = DbUserRole(user_id, role_id)
123 1
    db.session.add(user_role)
124
125 1
    event_data = {'role_id': str(role_id)}
126 1
    if initiator_id is not None:
127 1
        event_data['initiator_id'] = str(initiator_id)
128 1
    event = user_event_service.build_event('role-assigned', user_id, event_data)
129 1
    db.session.add(event)
130
131 1
    db.session.commit()
132
133
134 1
def deassign_role_from_user(
135
    role_id: RoleID, user_id: UserID, initiator_id: Optional[UserID] = None
136
) -> None:
137
    """Deassign the role from the user."""
138 1
    user_role = db.session.query(DbUserRole).get((user_id, role_id))
139
140 1
    if user_role is None:
141
        raise ValueError(
142
            f'Unknown role ID "{role_id}" and/or user ID "{user_id}".'
143
        )
144
145 1
    db.session.delete(user_role)
146
147 1
    event_data = {'role_id': str(role_id)}
148 1
    if initiator_id is not None:
149 1
        event_data['initiator_id'] = str(initiator_id)
150 1
    event = user_event_service.build_event(
151
        'role-deassigned', user_id, event_data
152
    )
153 1
    db.session.add(event)
154
155 1
    db.session.commit()
156
157
158 1
def deassign_all_roles_from_user(
159
    user_id: UserID, initiator_id: Optional[UserID] = None, commit: bool = True
160
) -> None:
161
    """Deassign all roles from the user."""
162 1
    table = DbUserRole.__table__
163 1
    delete_query = table.delete() \
164
        .where(table.c.user_id == user_id)
165 1
    db.session.execute(delete_query)
166
167 1
    if commit:
168 1
        db.session.commit()
169
170
171 1
def _is_role_assigned_to_user(role_id: RoleID, user_id: UserID) -> bool:
172
    """Determine if the role is assigned to the user or not."""
173 1
    subquery = db.session \
174
        .query(DbUserRole) \
175
        .filter_by(role_id=role_id) \
176
        .filter_by(user_id=user_id) \
177
        .exists()
178
179 1
    return db.session.query(subquery).scalar()
180
181
182 1
def get_permission_ids_for_user(user_id: UserID) -> set[PermissionID]:
183
    """Return the IDs of all permissions the user has through the roles
184
    assigned to it.
185
    """
186 1
    role_permissions = db.session \
187
        .query(DbRolePermission) \
188
        .join(DbRole) \
189
        .join(DbUserRole) \
190
        .filter(DbUserRole.user_id == user_id) \
191
        .all()
192
193 1
    return {rp.permission_id for rp in role_permissions}
194
195
196 1
def get_assigned_roles_for_permissions() -> dict[PermissionID, set[RoleID]]:
197
    """Return the IDs of roles that have permissions assigned, indexed
198
    by permission ID.
199
    """
200 1
    role_ids_by_permission_id = defaultdict(set)
201
202 1
    rows = db.session.execute(
203
        select(DbRolePermission.permission_id, DbRolePermission.role_id)
204
    ).all()
205
206 1
    permission_ids_and_role_ids = [
207
        (PermissionID(permission_id), RoleID(role_id))
208
        for permission_id, role_id in rows
209
    ]
210
211 1
    for permission_id, role_id in permission_ids_and_role_ids:
212 1
        role_ids_by_permission_id[permission_id].add(role_id)
213
214 1
    return dict(role_ids_by_permission_id)
215
216
217 1
def get_all_role_ids() -> set[RoleID]:
218
    """Return all role IDs."""
219
    return db.session.execute(
220
        select(DbRole.id)
221
    ).scalars().all()
222
223
224 1
def get_all_roles_with_permissions_and_users() -> list[
225
    tuple[Role, set[PermissionID], set[User]]
226
]:
227
    """Return all roles with titles, permission IDs, and assigned users."""
228 1
    db_roles = db.session.execute(
229
        select(DbRole)
230
        .options(
231
            db.undefer(DbRole.title),
232
            db.joinedload(DbRole.user_roles)
233
                .joinedload(DbUserRole.user)
234
        )
235
    ).scalars().unique().all()
236
237 1
    return [
238
        (
239
            _db_entity_to_role(db_role),
240
            {
241
                role_permission.permission_id
242
                for role_permission in db_role.role_permissions
243
            },
244
            {
245
                user_service._db_entity_to_user(db_user)
246
                for db_user in db_role.users
247
            },
248
        )
249
        for db_role in db_roles
250
    ]
251
252
253 1
def get_permission_ids_by_role() -> dict[Role, frozenset[PermissionID]]:
254
    """Return all roles with their assigned permission IDs.
255
256
    Role titles are undeferred to avoid lots of additional queries.
257
    """
258 1
    db_roles = db.session.execute(
259
        select(DbRole)
260
        .options(db.undefer(DbRole.title))
261
    ).scalars().unique().all()
262
263 1
    role_ids_and_permission_ids = db.session.execute(
264
        select(DbRolePermission.role_id, DbRolePermission.permission_id)
265
    ).all()
266
267 1
    return _index_permission_ids_by_role(role_ids_and_permission_ids, db_roles)
268
269
270 1
def get_permission_ids_by_role_for_user(
271
    user_id: UserID,
272
) -> dict[Role, frozenset[PermissionID]]:
273
    """Return permission IDs grouped by their respective roles for that
274
    user.
275
276
    Role titles are undeferred to avoid lots of additional queries.
277
    """
278 1
    db_roles = db.session.execute(
279
        select(DbRole)
280
        .options(db.undefer(DbRole.title))
281
        .join(DbUserRole)
282
        .filter(DbUserRole.user_id == user_id)
283
    ).scalars().unique().all()
284
285 1
    role_ids_and_permission_ids = db.session.execute(
286
        select(DbRolePermission.role_id, DbRolePermission.permission_id)
287
        .join(DbRole)
288
        .join(DbUserRole)
289
        .filter(DbUserRole.user_id == user_id)
290
    ).all()
291
292 1
    return _index_permission_ids_by_role(role_ids_and_permission_ids, db_roles)
293
294
295 1
def _index_permission_ids_by_role(
296
    role_ids_and_permission_ids: Iterable[tuple[RoleID, PermissionID]],
297
    db_roles: Iterable[DbRole],
298
) -> dict[Role, frozenset[PermissionID]]:
299
    """Index permission IDs by role."""
300 1
    permission_ids_by_role_id = defaultdict(set)
301 1
    for role_id, permission_id in role_ids_and_permission_ids:
302 1
        permission_ids_by_role_id[role_id].add(permission_id)
303
304 1
    permission_ids_by_role = {}
305
306 1
    for db_role in db_roles:
307 1
        role = _db_entity_to_role(db_role)
308 1
        permission_ids = frozenset(permission_ids_by_role_id[role.id])
309 1
        permission_ids_by_role[role] = permission_ids
310
311 1
    return permission_ids_by_role
312
313
314 1
def get_permission_ids_for_role(role_id: RoleID) -> set[PermissionID]:
315
    """Return the permission IDs assigned to the role."""
316 1
    permission_ids = db.session.execute(
317
        select(DbRolePermission.permission_id)
318
        .filter_by(role_id=role_id)
319
    ).scalars().all()
320
321 1
    return {PermissionID(permission_id) for permission_id in permission_ids}
322
323
324 1
def _commit_ignoring_integrity_error() -> None:
325 1
    try:
326 1
        db.session.commit()
327
    except IntegrityError:
328
        db.session.rollback()
329
330
331 1
def _db_entity_to_role(role: DbRole) -> Role:
332 1
    return Role(
333
        role.id,
334
        role.title,
335
    )
336