Passed
Push — main ( d6e672...4ee965 )
by Jochen
05:01
created

_db_entity_to_token()   A

Complexity

Conditions 2

Size

Total Lines 7
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 7
nop 1
dl 0
loc 7
ccs 2
cts 2
cp 1
crap 2
rs 10
c 0
b 0
f 0
1
"""
2
byceps.services.verification_token.service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2021 Jochen Kupperschmidt
6
:License: Revised BSD (see `LICENSE` file for details)
7
"""
8
9 1
from __future__ import annotations
10 1
from datetime import datetime, timedelta
11 1
from typing import Optional
12
13 1
from ...database import db
14 1
from ...typing import UserID
15
16 1
from .dbmodels import Token as DbToken
17 1
from .transfer.models import Purpose, Token
18
19
20 1
def create_for_email_address_confirmation(
21
    user_id: UserID, email_address: str
22
) -> Token:
23 1
    data = {'email_address': email_address}
24 1
    return _create_token(user_id, Purpose.email_address_confirmation, data=data)
25
26
27 1
def create_for_password_reset(user_id: UserID) -> Token:
28
    return _create_token(user_id, Purpose.password_reset)
29
30
31 1
def create_for_terms_consent(user_id: UserID) -> Token:
32
    return _create_token(user_id, Purpose.terms_consent)
33
34
35 1
def _create_token(
36
    user_id: UserID, purpose: Purpose, *, data: Optional[dict[str, str]] = None
37
) -> Token:
38 1
    token = DbToken(user_id, purpose, data=data)
39
40 1
    db.session.add(token)
41 1
    db.session.commit()
42
43 1
    return _db_entity_to_token(token)
44
45
46 1
def delete_token(token: str) -> None:
47
    """Delete the token."""
48 1
    db.session.query(DbToken) \
49
        .filter_by(token=token) \
50
        .delete()
51
52 1
    db.session.commit()
53
54
55 1
def find_for_email_address_confirmation_by_token(
56
    token_value: str,
57
) -> Optional[Token]:
58 1
    purpose = Purpose.email_address_confirmation
59 1
    return _find_for_purpose_by_token(token_value, purpose)
60
61
62 1
def find_for_password_reset_by_token(token_value: str) -> Optional[Token]:
63
    purpose = Purpose.password_reset
64
    return _find_for_purpose_by_token(token_value, purpose)
65
66
67 1
def find_for_terms_consent_by_token(token_value: str) -> Optional[Token]:
68
    purpose = Purpose.terms_consent
69
    return _find_for_purpose_by_token(token_value, purpose)
70
71
72 1
def _find_for_purpose_by_token(
73
    token_value: str, purpose: Purpose
74
) -> Optional[Token]:
75 1
    token = DbToken.query \
76
        .filter_by(token=token_value) \
77
        .for_purpose(purpose) \
78
        .first()
79
80 1
    if token is None:
81 1
        return None
82
83 1
    return _db_entity_to_token(token)
84
85
86 1
def _db_entity_to_token(token: DbToken) -> Token:
87 1
    return Token(
88
        token=token.token,
89
        created_at=token.created_at,
90
        user_id=token.user_id,
91
        purpose=token.purpose,
92
        data=token.data if token.data is not None else {},
93
    )
94
95
96 1
def count_tokens_by_purpose() -> dict[Purpose, int]:
97
    """Count verification tokens, grouped by purpose."""
98
    rows = db.session \
99
        .query(
100
            DbToken._purpose,
101
            db.func.count(DbToken.token)
102
        ) \
103
        .group_by(DbToken._purpose) \
104
        .all()
105
106
    counts_by_name = dict(rows)
107
108
    return {purpose: counts_by_name[purpose.name] for purpose in Purpose}
109
110
111 1
def is_expired(token: Token) -> bool:
112
    """Return `True` if the token has expired, i.e. it is no longer valid."""
113 1
    if token.purpose != Purpose.password_reset:
114 1
        return False
115
116 1
    now = datetime.utcnow()
117 1
    expires_after = timedelta(hours=24)
118
    return now >= (token.created_at + expires_after)
119