1
|
|
|
""" |
2
|
|
|
byceps.util.user_session |
3
|
|
|
~~~~~~~~~~~~~~~~~~~~~~~~ |
4
|
|
|
|
5
|
|
|
:Copyright: 2006-2021 Jochen Kupperschmidt |
6
|
|
|
:License: Revised BSD (see `LICENSE` file for details) |
7
|
|
|
""" |
8
|
|
|
|
9
|
1 |
|
from __future__ import annotations |
10
|
1 |
|
from enum import Enum |
11
|
1 |
|
from typing import Optional |
12
|
1 |
|
from uuid import UUID |
13
|
|
|
|
14
|
1 |
|
from babel import parse_locale |
15
|
1 |
|
from flask import session |
16
|
|
|
|
17
|
1 |
|
from ..services.authentication.session.models.current_user import CurrentUser |
18
|
1 |
|
from ..services.authentication.session import service as session_service |
19
|
1 |
|
from ..services.user import service as user_service |
20
|
1 |
|
from ..services.user.transfer.models import User |
21
|
1 |
|
from ..typing import PartyID, UserID |
22
|
|
|
|
23
|
1 |
|
from .authorization import get_permissions_for_user |
24
|
|
|
|
25
|
|
|
|
26
|
1 |
|
KEY_LOCALE = 'locale' |
27
|
1 |
|
KEY_USER_ID = 'user_id' |
28
|
1 |
|
KEY_USER_AUTH_TOKEN = 'user_auth_token' |
29
|
|
|
|
30
|
|
|
|
31
|
1 |
|
def start(user_id: UserID, auth_token: str, *, permanent: bool = False) -> None: |
32
|
|
|
"""Initialize the user's session by putting the relevant data |
33
|
|
|
into the session cookie. |
34
|
|
|
""" |
35
|
1 |
|
session.clear() |
36
|
|
|
|
37
|
1 |
|
session[KEY_USER_ID] = str(user_id) |
38
|
1 |
|
session[KEY_USER_AUTH_TOKEN] = str(auth_token) |
39
|
1 |
|
session.permanent = permanent |
40
|
|
|
|
41
|
|
|
|
42
|
1 |
|
def end() -> None: |
43
|
|
|
"""End the user's session by deleting the session cookie.""" |
44
|
|
|
session.pop(KEY_USER_ID, None) |
45
|
|
|
session.pop(KEY_USER_AUTH_TOKEN, None) |
46
|
|
|
session.permanent = False |
47
|
|
|
|
48
|
|
|
|
49
|
1 |
|
def get_current_user( |
50
|
|
|
required_permissions: set[Enum], |
51
|
|
|
*, |
52
|
|
|
party_id: Optional[PartyID] = None, |
53
|
|
|
) -> CurrentUser: |
54
|
1 |
|
session_locale = _get_session_locale() |
55
|
|
|
|
56
|
1 |
|
user = _find_user(party_id=party_id) |
57
|
1 |
|
if user is None: |
58
|
1 |
|
return session_service.get_anonymous_current_user(session_locale) |
59
|
|
|
|
60
|
1 |
|
permissions = get_permissions_for_user(user.id) |
61
|
1 |
|
if not required_permissions.issubset(permissions): |
62
|
|
|
return session_service.get_anonymous_current_user(session_locale) |
63
|
|
|
|
64
|
1 |
|
locale = _get_user_locale(user) or session_locale |
65
|
|
|
|
66
|
1 |
|
return session_service.get_authenticated_current_user( |
67
|
|
|
user, locale, permissions |
68
|
|
|
) |
69
|
|
|
|
70
|
|
|
|
71
|
1 |
|
def _find_user(*, party_id: Optional[PartyID] = None) -> Optional[User]: |
72
|
|
|
"""Return the current user if authenticated, `None` if not. |
73
|
|
|
|
74
|
|
|
Return `None` if: |
75
|
|
|
- the ID is unknown. |
76
|
|
|
- the account is not enabled. |
77
|
|
|
- the auth token is invalid. |
78
|
|
|
""" |
79
|
1 |
|
user_id_str = session.get(KEY_USER_ID) |
80
|
1 |
|
auth_token = session.get(KEY_USER_AUTH_TOKEN) |
81
|
|
|
|
82
|
1 |
|
if user_id_str is None: |
83
|
1 |
|
return None |
84
|
|
|
|
85
|
1 |
|
try: |
86
|
1 |
|
user_id = UserID(UUID(user_id_str)) |
87
|
|
|
except ValueError: |
88
|
|
|
return None |
89
|
|
|
|
90
|
1 |
|
user = user_service.find_active_user( |
91
|
|
|
user_id, include_avatar=True, include_orga_flag_for_party_id=party_id |
92
|
|
|
) |
93
|
|
|
|
94
|
1 |
|
if user is None: |
95
|
|
|
return None |
96
|
|
|
|
97
|
|
|
# Validate auth token. |
98
|
1 |
|
if (auth_token is None) or not session_service.is_session_valid( |
99
|
|
|
user.id, auth_token |
100
|
|
|
): |
101
|
|
|
# Bad auth token, not logging in. |
102
|
|
|
return None |
103
|
|
|
|
104
|
1 |
|
return user |
105
|
|
|
|
106
|
|
|
|
107
|
1 |
|
def _get_session_locale() -> Optional[str]: |
108
|
|
|
"""Return the locale set in the session, if any.""" |
109
|
1 |
|
return session.get(KEY_LOCALE) |
110
|
|
|
|
111
|
|
|
|
112
|
1 |
|
def _get_user_locale(user: User) -> Optional[str]: |
113
|
|
|
"""Return the locale set for the user, if any.""" |
114
|
1 |
|
locale_str = user.locale |
115
|
1 |
|
if not locale_str: |
116
|
1 |
|
return None |
117
|
|
|
|
118
|
|
|
try: |
119
|
|
|
parse_locale(locale_str) |
120
|
|
|
except ValueError: |
121
|
|
|
return None |
122
|
|
|
|
123
|
|
|
return locale_str |
124
|
|
|
|
125
|
|
|
|
126
|
1 |
|
def set_locale(locale: str) -> None: |
127
|
|
|
"""Set locale for the session.""" |
128
|
|
|
session[KEY_LOCALE] = locale |
129
|
|
|
|