Passed
Push — main ( d6e672...4ee965 )
by Jochen
05:01
created

byceps.services.verification_token.service   A

Complexity

Total Complexity 15

Size/Duplication

Total Lines 119
Duplicated Lines 0 %

Test Coverage

Coverage 81.25%

Importance

Changes 0
Metric Value
eloc 68
dl 0
loc 119
ccs 39
cts 48
cp 0.8125
rs 10
c 0
b 0
f 0
wmc 15

12 Functions

Rating   Name   Duplication   Size   Complexity  
A find_for_email_address_confirmation_by_token() 0 5 1
A create_for_terms_consent() 0 2 1
A create_for_email_address_confirmation() 0 5 1
A create_for_password_reset() 0 2 1
A _find_for_purpose_by_token() 0 12 2
A delete_token() 0 7 1
A is_expired() 0 8 2
A _db_entity_to_token() 0 7 2
A find_for_terms_consent_by_token() 0 3 1
A count_tokens_by_purpose() 0 13 1
A _create_token() 0 9 1
A find_for_password_reset_by_token() 0 3 1
1
"""
2
byceps.services.verification_token.service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2021 Jochen Kupperschmidt
6
:License: Revised BSD (see `LICENSE` file for details)
7
"""
8
9 1
from __future__ import annotations
10 1
from datetime import datetime, timedelta
11 1
from typing import Optional
12
13 1
from ...database import db
14 1
from ...typing import UserID
15
16 1
from .dbmodels import Token as DbToken
17 1
from .transfer.models import Purpose, Token
18
19
20 1
def create_for_email_address_confirmation(
21
    user_id: UserID, email_address: str
22
) -> Token:
23 1
    data = {'email_address': email_address}
24 1
    return _create_token(user_id, Purpose.email_address_confirmation, data=data)
25
26
27 1
def create_for_password_reset(user_id: UserID) -> Token:
28
    return _create_token(user_id, Purpose.password_reset)
29
30
31 1
def create_for_terms_consent(user_id: UserID) -> Token:
32
    return _create_token(user_id, Purpose.terms_consent)
33
34
35 1
def _create_token(
36
    user_id: UserID, purpose: Purpose, *, data: Optional[dict[str, str]] = None
37
) -> Token:
38 1
    token = DbToken(user_id, purpose, data=data)
39
40 1
    db.session.add(token)
41 1
    db.session.commit()
42
43 1
    return _db_entity_to_token(token)
44
45
46 1
def delete_token(token: str) -> None:
47
    """Delete the token."""
48 1
    db.session.query(DbToken) \
49
        .filter_by(token=token) \
50
        .delete()
51
52 1
    db.session.commit()
53
54
55 1
def find_for_email_address_confirmation_by_token(
56
    token_value: str,
57
) -> Optional[Token]:
58 1
    purpose = Purpose.email_address_confirmation
59 1
    return _find_for_purpose_by_token(token_value, purpose)
60
61
62 1
def find_for_password_reset_by_token(token_value: str) -> Optional[Token]:
63
    purpose = Purpose.password_reset
64
    return _find_for_purpose_by_token(token_value, purpose)
65
66
67 1
def find_for_terms_consent_by_token(token_value: str) -> Optional[Token]:
68
    purpose = Purpose.terms_consent
69
    return _find_for_purpose_by_token(token_value, purpose)
70
71
72 1
def _find_for_purpose_by_token(
73
    token_value: str, purpose: Purpose
74
) -> Optional[Token]:
75 1
    token = DbToken.query \
76
        .filter_by(token=token_value) \
77
        .for_purpose(purpose) \
78
        .first()
79
80 1
    if token is None:
81 1
        return None
82
83 1
    return _db_entity_to_token(token)
84
85
86 1
def _db_entity_to_token(token: DbToken) -> Token:
87 1
    return Token(
88
        token=token.token,
89
        created_at=token.created_at,
90
        user_id=token.user_id,
91
        purpose=token.purpose,
92
        data=token.data if token.data is not None else {},
93
    )
94
95
96 1
def count_tokens_by_purpose() -> dict[Purpose, int]:
97
    """Count verification tokens, grouped by purpose."""
98
    rows = db.session \
99
        .query(
100
            DbToken._purpose,
101
            db.func.count(DbToken.token)
102
        ) \
103
        .group_by(DbToken._purpose) \
104
        .all()
105
106
    counts_by_name = dict(rows)
107
108
    return {purpose: counts_by_name[purpose.name] for purpose in Purpose}
109
110
111 1
def is_expired(token: Token) -> bool:
112
    """Return `True` if the token has expired, i.e. it is no longer valid."""
113 1
    if token.purpose != Purpose.password_reset:
114 1
        return False
115
116 1
    now = datetime.utcnow()
117 1
    expires_after = timedelta(hours=24)
118
    return now >= (token.created_at + expires_after)
119