Passed
Push — main ( 4ee965...bf18b3 )
by Jochen
06:06
created

create_for_email_address_change()   A

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 4
nop 2
dl 0
loc 5
ccs 3
cts 3
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
"""
2
byceps.services.verification_token.service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2021 Jochen Kupperschmidt
6
:License: Revised BSD (see `LICENSE` file for details)
7
"""
8
9 1
from __future__ import annotations
10 1
from datetime import datetime, timedelta
11 1
from typing import Optional
12
13 1
from ...database import db
14 1
from ...typing import UserID
15
16 1
from .dbmodels import Token as DbToken
17 1
from .transfer.models import Purpose, Token
18
19
20 1
def create_for_email_address_change(
21
    user_id: UserID, new_email_address: str
22
) -> Token:
23 1
    data = {'new_email_address': new_email_address}
24 1
    return _create_token(user_id, Purpose.email_address_change, data=data)
25
26
27 1
def create_for_email_address_confirmation(
28
    user_id: UserID, email_address: str
29
) -> Token:
30 1
    data = {'email_address': email_address}
31 1
    return _create_token(user_id, Purpose.email_address_confirmation, data=data)
32
33
34 1
def create_for_password_reset(user_id: UserID) -> Token:
35
    return _create_token(user_id, Purpose.password_reset)
36
37
38 1
def create_for_terms_consent(user_id: UserID) -> Token:
39
    return _create_token(user_id, Purpose.terms_consent)
40
41
42 1
def _create_token(
43
    user_id: UserID, purpose: Purpose, *, data: Optional[dict[str, str]] = None
44
) -> Token:
45 1
    token = DbToken(user_id, purpose, data=data)
46
47 1
    db.session.add(token)
48 1
    db.session.commit()
49
50 1
    return _db_entity_to_token(token)
51
52
53 1
def delete_token(token: str) -> None:
54
    """Delete the token."""
55 1
    db.session.query(DbToken) \
56
        .filter_by(token=token) \
57
        .delete()
58
59 1
    db.session.commit()
60
61
62 1
def find_for_email_address_change_by_token(token_value: str) -> Optional[Token]:
63 1
    purpose = Purpose.email_address_change
64 1
    return _find_for_purpose_by_token(token_value, purpose)
65
66
67 1
def find_for_email_address_confirmation_by_token(
68
    token_value: str,
69
) -> Optional[Token]:
70 1
    purpose = Purpose.email_address_confirmation
71 1
    return _find_for_purpose_by_token(token_value, purpose)
72
73
74 1
def find_for_password_reset_by_token(token_value: str) -> Optional[Token]:
75
    purpose = Purpose.password_reset
76
    return _find_for_purpose_by_token(token_value, purpose)
77
78
79 1
def find_for_terms_consent_by_token(token_value: str) -> Optional[Token]:
80
    purpose = Purpose.terms_consent
81
    return _find_for_purpose_by_token(token_value, purpose)
82
83
84 1
def _find_for_purpose_by_token(
85
    token_value: str, purpose: Purpose
86
) -> Optional[Token]:
87 1
    token = DbToken.query \
88
        .filter_by(token=token_value) \
89
        .for_purpose(purpose) \
90
        .first()
91
92 1
    if token is None:
93 1
        return None
94
95 1
    return _db_entity_to_token(token)
96
97
98 1
def _db_entity_to_token(token: DbToken) -> Token:
99 1
    return Token(
100
        token=token.token,
101
        created_at=token.created_at,
102
        user_id=token.user_id,
103
        purpose=token.purpose,
104
        data=token.data if token.data is not None else {},
105
    )
106
107
108 1
def count_tokens_by_purpose() -> dict[Purpose, int]:
109
    """Count verification tokens, grouped by purpose."""
110
    rows = db.session \
111
        .query(
112
            DbToken._purpose,
113
            db.func.count(DbToken.token)
114
        ) \
115
        .group_by(DbToken._purpose) \
116
        .all()
117
118
    counts_by_name = dict(rows)
119
120
    return {purpose: counts_by_name[purpose.name] for purpose in Purpose}
121
122
123 1
def is_expired(token: Token) -> bool:
124
    """Return `True` if the token has expired, i.e. it is no longer valid."""
125 1
    if token.purpose != Purpose.password_reset:
126 1
        return False
127
128 1
    now = datetime.utcnow()
129 1
    expires_after = timedelta(hours=24)
130
    return now >= (token.created_at + expires_after)
131