byceps /
byceps
| 1 | """ |
||
| 2 | byceps.util.authz |
||
| 3 | ~~~~~~~~~~~~~~~~~ |
||
| 4 | |||
| 5 | :Copyright: 2014-2025 Jochen Kupperschmidt |
||
| 6 | :License: Revised BSD (see `LICENSE` file for details) |
||
| 7 | """ |
||
| 8 | |||
| 9 | 1 | from importlib import import_module |
|
| 10 | 1 | import pkgutil |
|
| 11 | |||
| 12 | 1 | from flask import g |
|
| 13 | 1 | from flask_babel import LazyString |
|
| 14 | |||
| 15 | 1 | from byceps.services.authz import authz_service |
|
| 16 | 1 | from byceps.services.authz.models import Permission, PermissionID |
|
| 17 | 1 | from byceps.services.user.models.user import UserID |
|
| 18 | |||
| 19 | |||
| 20 | 1 | def load_permissions() -> None: |
|
| 21 | """Load permissions from modules in the permissions package.""" |
||
| 22 | 1 | services_pkg_module = import_module('byceps.services') |
|
| 23 | 1 | services_pkg_name = services_pkg_module.__name__ |
|
| 24 | 1 | ||
| 25 | 1 | service_mods = pkgutil.iter_modules( |
|
| 26 | 1 | services_pkg_module.__path__, prefix=f'{services_pkg_name}.' |
|
| 27 | 1 | ) |
|
| 28 | |||
| 29 | for service_mod in service_mods: |
||
| 30 | 1 | try: |
|
| 31 | import_module(f'{service_mod.name}.permissions') |
||
| 32 | except ModuleNotFoundError: |
||
|
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
Loading history...
|
|||
| 33 | pass |
||
| 34 | 1 | ||
| 35 | 1 | ||
| 36 | 1 | def register_permissions( |
|
| 37 | group: str, names_and_labels: list[tuple[str, LazyString]] |
||
| 38 | ) -> None: |
||
| 39 | 1 | """Register a permission.""" |
|
| 40 | for name, label in names_and_labels: |
||
| 41 | 1 | permission_id = PermissionID(f'{group}.{name}') |
|
| 42 | permission_registry.register_permission(permission_id, label) |
||
| 43 | |||
| 44 | 1 | ||
| 45 | def get_permissions_for_user(user_id: UserID) -> frozenset[str]: |
||
| 46 | """Return the permissions this user has been granted.""" |
||
| 47 | 1 | registered_permission_ids = ( |
|
| 48 | permission_registry.get_registered_permission_ids() |
||
| 49 | ) |
||
| 50 | user_permission_ids = authz_service.get_permission_ids_for_user(user_id) |
||
| 51 | |||
| 52 | # Ignore unregistered permission IDs. |
||
| 53 | return frozenset( |
||
| 54 | 1 | str(permission_id) |
|
| 55 | for permission_id in registered_permission_ids |
||
| 56 | if permission_id in user_permission_ids |
||
| 57 | 1 | ) |
|
| 58 | 1 | ||
| 59 | |||
| 60 | 1 | class PermissionRegistry: |
|
| 61 | """A collection of valid permissions.""" |
||
| 62 | |||
| 63 | def __init__(self) -> None: |
||
| 64 | 1 | self._permissions: dict[PermissionID, LazyString] = {} |
|
| 65 | |||
| 66 | 1 | def register_permission( |
|
| 67 | self, permission_id: PermissionID, label: LazyString |
||
| 68 | 1 | ) -> None: |
|
| 69 | """Add permission to the registry.""" |
||
| 70 | 1 | self._permissions[permission_id] = label |
|
| 71 | |||
| 72 | 1 | def get_registered_permission_ids(self) -> frozenset[PermissionID]: |
|
| 73 | """Return all registered permission IDs.""" |
||
| 74 | return frozenset(self._permissions.keys()) |
||
| 75 | |||
| 76 | def get_registered_permissions(self) -> frozenset[Permission]: |
||
| 77 | """Return all registered permissions.""" |
||
| 78 | 1 | return frozenset( |
|
| 79 | Permission(id=permission_id, title=label) |
||
| 80 | for permission_id, label in self._permissions.items() |
||
| 81 | 1 | ) |
|
| 82 | |||
| 83 | 1 | ||
| 84 | permission_registry = PermissionRegistry() |
||
| 85 | |||
| 86 | 1 | ||
| 87 | def has_current_user_permission(permission: str) -> bool: |
||
| 88 | 1 | """Return `True` if the current user has this permission.""" |
|
| 89 | return permission in g.user.permissions |
||
| 90 | |||
| 91 | |||
| 92 | def has_current_user_any_permission(*permissions: str) -> bool: |
||
| 93 | """Return `True` if the current user has any of these permissions.""" |
||
| 94 | return any(map(has_current_user_permission, permissions)) |
||
| 95 |