Passed
Push — main ( baf70b...d6e672 )
by Jochen
04:50
created

count_tokens_by_purpose()   A

Complexity

Conditions 1

Size

Total Lines 13
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.4218

Importance

Changes 0
Metric Value
cc 1
eloc 9
nop 0
dl 0
loc 13
ccs 1
cts 4
cp 0.25
crap 1.4218
rs 9.95
c 0
b 0
f 0
1
"""
2
byceps.services.verification_token.service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2021 Jochen Kupperschmidt
6
:License: Revised BSD (see `LICENSE` file for details)
7
"""
8
9 1
from __future__ import annotations
10 1
from typing import Optional
11
12 1
from ...database import db
13 1
from ...typing import UserID
14
15 1
from .dbmodels import Token as DbToken
16 1
from .transfer.models import Purpose
17
18
19 1
def create_for_email_address_confirmation(user_id: UserID) -> DbToken:
20 1
    return _create_token(user_id, Purpose.email_address_confirmation)
21
22
23 1
def create_for_password_reset(user_id: UserID) -> DbToken:
24
    return _create_token(user_id, Purpose.password_reset)
25
26
27 1
def create_for_terms_consent(user_id: UserID) -> DbToken:
28
    return _create_token(user_id, Purpose.terms_consent)
29
30
31 1
def _create_token(user_id: UserID, purpose: Purpose) -> DbToken:
32 1
    token = DbToken(user_id, purpose)
33
34 1
    db.session.add(token)
35 1
    db.session.commit()
36
37 1
    return token
38
39
40 1
def delete_token(token: str) -> None:
41
    """Delete the token."""
42 1
    db.session.query(DbToken) \
43
        .filter_by(token=token) \
44
        .delete()
45
46 1
    db.session.commit()
47
48
49 1
def find_for_email_address_confirmation_by_token(
50
    token_value: str,
51
) -> Optional[DbToken]:
52 1
    purpose = Purpose.email_address_confirmation
53 1
    return _find_for_purpose_by_token(token_value, purpose)
54
55
56 1
def find_for_password_reset_by_token(token_value: str) -> Optional[DbToken]:
57
    purpose = Purpose.password_reset
58
    return _find_for_purpose_by_token(token_value, purpose)
59
60
61 1
def find_for_terms_consent_by_token(token_value: str) -> Optional[DbToken]:
62
    purpose = Purpose.terms_consent
63
    return _find_for_purpose_by_token(token_value, purpose)
64
65
66 1
def _find_for_purpose_by_token(
67
    token_value: str, purpose: Purpose
68
) -> Optional[DbToken]:
69 1
    return DbToken.query \
70
        .filter_by(token=token_value) \
71
        .for_purpose(purpose) \
72
        .first()
73
74
75 1
def count_tokens_by_purpose() -> dict[Purpose, int]:
76
    """Count verification tokens, grouped by purpose."""
77
    rows = db.session \
78
        .query(
79
            DbToken._purpose,
80
            db.func.count(DbToken.token)
81
        ) \
82
        .group_by(DbToken._purpose) \
83
        .all()
84
85
    counts_by_name = dict(rows)
86
87
    return {purpose: counts_by_name[purpose.name] for purpose in Purpose}
88