Passed
Push — main ( 7da116...6ee49c )
by Jochen
04:48
created

find_for_consent_by_token()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.2963

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 1
dl 0
loc 3
ccs 1
cts 3
cp 0.3333
crap 1.2963
rs 10
c 0
b 0
f 0
1
"""
2
byceps.services.verification_token.service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2021 Jochen Kupperschmidt
6
:License: Revised BSD (see `LICENSE` file for details)
7
"""
8
9 1
from __future__ import annotations
10 1
from datetime import datetime, timedelta
11 1
from typing import Optional
12
13 1
from ...database import db
14 1
from ...typing import UserID
15
16 1
from .dbmodels import Token as DbToken
17 1
from .transfer.models import Purpose, Token
18
19
20 1
def create_for_email_address_change(
21
    user_id: UserID, new_email_address: str
22
) -> Token:
23 1
    data = {'new_email_address': new_email_address}
24 1
    return _create_token(user_id, Purpose.email_address_change, data=data)
25
26
27 1
def create_for_email_address_confirmation(
28
    user_id: UserID, email_address: str
29
) -> Token:
30 1
    data = {'email_address': email_address}
31 1
    return _create_token(user_id, Purpose.email_address_confirmation, data=data)
32
33
34 1
def create_for_password_reset(user_id: UserID) -> Token:
35
    return _create_token(user_id, Purpose.password_reset)
36
37
38 1
def create_for_consent(user_id: UserID) -> Token:
39
    return _create_token(user_id, Purpose.consent)
40
41
42 1
def _create_token(
43
    user_id: UserID, purpose: Purpose, *, data: Optional[dict[str, str]] = None
44
) -> Token:
45 1
    token = DbToken(user_id, purpose, data=data)
46
47 1
    db.session.add(token)
48 1
    db.session.commit()
49
50 1
    return _db_entity_to_token(token)
51
52
53 1
def delete_token(token: str) -> None:
54
    """Delete the token."""
55 1
    db.session.query(DbToken) \
56
        .filter_by(token=token) \
57
        .delete()
58
59 1
    db.session.commit()
60
61
62 1
def delete_old_tokens(created_before: datetime) -> int:
63
    """Delete tokens which were created before the given date.
64
65
    Return the number of deleted tokens.
66
    """
67
    num_deleted = DbToken.query \
68
        .filter(DbToken.created_at < created_before) \
69
        .delete()
70
71
    db.session.commit()
72
73
    return num_deleted
74
75
76 1
def find_for_email_address_change_by_token(token_value: str) -> Optional[Token]:
77 1
    purpose = Purpose.email_address_change
78 1
    return _find_for_purpose_by_token(token_value, purpose)
79
80
81 1
def find_for_email_address_confirmation_by_token(
82
    token_value: str,
83
) -> Optional[Token]:
84 1
    purpose = Purpose.email_address_confirmation
85 1
    return _find_for_purpose_by_token(token_value, purpose)
86
87
88 1
def find_for_password_reset_by_token(token_value: str) -> Optional[Token]:
89
    purpose = Purpose.password_reset
90
    return _find_for_purpose_by_token(token_value, purpose)
91
92
93 1
def find_for_consent_by_token(token_value: str) -> Optional[Token]:
94
    purpose = Purpose.consent
95
    return _find_for_purpose_by_token(token_value, purpose)
96
97
98 1
def _find_for_purpose_by_token(
99
    token_value: str, purpose: Purpose
100
) -> Optional[Token]:
101 1
    token = DbToken.query \
102
        .filter_by(token=token_value) \
103
        .for_purpose(purpose) \
104
        .first()
105
106 1
    if token is None:
107 1
        return None
108
109 1
    return _db_entity_to_token(token)
110
111
112 1
def _db_entity_to_token(token: DbToken) -> Token:
113 1
    return Token(
114
        token=token.token,
115
        created_at=token.created_at,
116
        user_id=token.user_id,
117
        purpose=token.purpose,
118
        data=token.data if token.data is not None else {},
119
    )
120
121
122 1
def count_tokens_by_purpose() -> dict[Purpose, int]:
123
    """Count verification tokens, grouped by purpose."""
124
    rows = db.session \
125
        .query(
126
            DbToken._purpose,
127
            db.func.count(DbToken.token)
128
        ) \
129
        .group_by(DbToken._purpose) \
130
        .all()
131
132
    counts_by_name = dict(rows)
133
134
    return {purpose: counts_by_name.get(purpose.name, 0) for purpose in Purpose}
135
136
137 1
def is_expired(token: Token) -> bool:
138
    """Return `True` if the token has expired, i.e. it is no longer valid."""
139 1
    if token.purpose != Purpose.password_reset:
140 1
        return False
141
142 1
    now = datetime.utcnow()
143 1
    expires_after = timedelta(hours=24)
144
    return now >= (token.created_at + expires_after)
145