Passed
Push — main ( 2e04c3...4e4ece )
by Jochen
05:30
created

is_session_valid()   A

Complexity

Conditions 4

Size

Total Lines 16
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 4.8437

Importance

Changes 0
Metric Value
cc 4
eloc 8
nop 2
dl 0
loc 16
ccs 5
cts 8
cp 0.625
crap 4.8437
rs 10
c 0
b 0
f 0
1
"""
2
byceps.services.authentication.session.service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2021 Jochen Kupperschmidt
6
:License: Revised BSD (see `LICENSE` file for details)
7
"""
8
9 1
from __future__ import annotations
10 1
from datetime import datetime
11 1
from enum import Enum
12 1
from typing import Optional
13 1
from uuid import UUID, uuid4
14
15 1
from ....database import db, insert_ignore_on_conflict, upsert
16 1
from ....events.auth import UserLoggedIn
17 1
from ....typing import UserID
18
19 1
from ...site.transfer.models import SiteID
20 1
from ...user import event_service as user_event_service, service as user_service
21 1
from ...user.transfer.models import User
22
23 1
from .dbmodels.recent_login import RecentLogin as DbRecentLogin
24 1
from .dbmodels.session_token import SessionToken as DbSessionToken
25 1
from .models.current_user import CurrentUser
26
27
28 1
def get_session_token(user_id: UserID) -> DbSessionToken:
29
    """Return session token.
30
31
    Create one if none exists for the user.
32
    """
33 1
    table = DbSessionToken.__table__
34
35 1
    values = {
36
        'user_id': user_id,
37
        'token': uuid4(),
38
        'created_at': datetime.utcnow(),
39
    }
40
41 1
    insert_ignore_on_conflict(table, values)
42
43 1
    return DbSessionToken.query \
44
        .filter_by(user_id=user_id) \
45
        .one()
46
47
48 1
def delete_session_tokens_for_user(user_id: UserID) -> None:
49
    """Delete all session tokens that belong to the user."""
50 1
    db.session.query(DbSessionToken) \
51
        .filter_by(user_id=user_id) \
52
        .delete()
53 1
    db.session.commit()
54
55
56 1
def delete_all_session_tokens() -> int:
57
    """Delete all users' session tokens.
58
59
    Return the number of records deleted.
60
    """
61
    deleted_total = db.session.query(DbSessionToken).delete()
62
    db.session.commit()
63
64
    return deleted_total
65
66
67 1
def find_session_token_for_user(user_id: UserID) -> Optional[DbSessionToken]:
68
    """Return the session token for the user with that ID, or `None` if
69
    not found.
70
    """
71 1
    return DbSessionToken.query \
72
        .filter_by(user_id=user_id) \
73
        .one_or_none()
74
75
76 1
def is_session_valid(user_id: UserID, auth_token: str) -> bool:
77
    """Return `True` if the client session is valid, `False` if not."""
78 1
    if user_id is None:
79
        # User ID must not be empty.
80
        return False
81
82 1
    if not auth_token:
83
        # Authentication token must not be empty.
84
        return False
85
86 1
    if not _is_token_valid_for_user(auth_token, user_id):
87
        # Session token is unknown or the user ID provided by the
88
        # client does not match the one stored on the server.
89
        return False
90
91 1
    return True
92
93
94 1
def _is_token_valid_for_user(token: str, user_id: UserID) -> bool:
95
    """Return `True` if a session token with that ID exists for that user."""
96 1
    if not user_id:
97
        raise ValueError('User ID is invalid.')
98
99 1
    subquery = DbSessionToken.query \
100
        .filter_by(token=token, user_id=user_id) \
101
        .exists()
102
103 1
    return db.session.query(subquery).scalar()
104
105
106 1
def log_in_user(
107
    user_id: UserID, ip_address: str, *, site_id: Optional[SiteID] = None
108
) -> tuple[str, UserLoggedIn]:
109
    """Create a session token and record the log in."""
110 1
    session_token = get_session_token(user_id)
111
112 1
    occurred_at = datetime.utcnow()
113 1
    user = user_service.get_user(user_id)
114
115 1
    _create_login_event(user_id, occurred_at, ip_address, site_id=site_id)
116 1
    _record_recent_login(user_id, occurred_at)
117
118 1
    event = UserLoggedIn(
119
        occurred_at=occurred_at,
120
        initiator_id=user.id,
121
        initiator_screen_name=user.screen_name,
122
        site_id=site_id,
123
    )
124
125 1
    return session_token.token, event
126
127
128 1
def _create_login_event(
129
    user_id: UserID,
130
    occurred_at: datetime,
131
    ip_address: str,
132
    *,
133
    site_id: Optional[SiteID] = None,
134
) -> None:
135
    """Create an event that represents a user login."""
136 1
    data = {'ip_address': ip_address}
137 1
    if site_id:
138 1
        data['site_id'] = site_id
139 1
    user_event_service.create_event(
140
        'user-logged-in', user_id, data, occurred_at=occurred_at
141
    )
142
143
144 1
def find_recent_login(user_id: UserID) -> Optional[datetime]:
145
    """Return the time of the user's most recent login, if found."""
146 1
    recent_login = DbRecentLogin.query \
147
        .filter_by(user_id=user_id) \
148
        .one_or_none()
149
150 1
    if recent_login is None:
151 1
        return None
152
153 1
    return recent_login.occurred_at
154
155
156 1
def _record_recent_login(user_id: UserID, occurred_at: datetime) -> None:
157
    """Store the time of the user's most recent login."""
158 1
    table = DbRecentLogin.__table__
159 1
    identifier = {'user_id': user_id}
160 1
    replacement = {'occurred_at': occurred_at}
161
162 1
    upsert(table, identifier, replacement)
163
164
165 1
ANONYMOUS_USER_ID = UserID(UUID('00000000-0000-0000-0000-000000000000'))
166
167
168 1
def get_anonymous_current_user(locale: Optional[str]) -> CurrentUser:
169
    """Return an anonymous current user object."""
170 1
    return CurrentUser(
171
        id=ANONYMOUS_USER_ID,
172
        screen_name=None,
173
        suspended=False,
174
        deleted=False,
175
        locale=locale,
176
        avatar_url=None,
177
        is_orga=False,
178
        authenticated=False,
179
        permissions=frozenset(),
180
    )
181
182
183 1
def get_authenticated_current_user(
184
    user: User,
185
    locale: Optional[str],
186
    permissions: Optional[frozenset[Enum]],
187
) -> CurrentUser:
188
    """Return an authenticated current user object."""
189 1
    return CurrentUser(
190
        id=user.id,
191
        screen_name=user.screen_name,
192
        suspended=False,  # Current user cannot be suspended.
193
        deleted=False,  # Current user cannot be deleted.
194
        locale=locale,
195
        avatar_url=user.avatar_url,
196
        is_orga=user.is_orga,
197
        authenticated=True,
198
        permissions=permissions,
199
    )
200