Passed
Push — main ( cb2e8b...5593c7 )
by Jochen
04:09
created

migrate_password_hash_if_outdated()   A

Complexity

Conditions 2

Size

Total Lines 10
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 2.3149

Importance

Changes 0
Metric Value
cc 2
eloc 7
nop 2
dl 0
loc 10
rs 10
c 0
b 0
f 0
ccs 4
cts 7
cp 0.5714
crap 2.3149
1
"""
2
byceps.services.authentication.password.service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2021 Jochen Kupperschmidt
6
:License: Revised BSD (see `LICENSE` file for details)
7
"""
8
9 1
from datetime import datetime
10 1
from typing import Optional
11
12 1
from werkzeug.security import (
13
    check_password_hash as _check_password_hash,
14
    generate_password_hash as _generate_password_hash,
15
)
16
17 1
from ....database import db
18 1
from ....typing import UserID
19
20 1
from ...user import event_service as user_event_service
21
22 1
from ..session import service as session_service
23
24 1
from .dbmodels import Credential as DbCredential
25
26
27 1
PASSWORD_HASH_ITERATIONS = 250000
28 1
PASSWORD_HASH_METHOD = 'pbkdf2:sha256:%d' % PASSWORD_HASH_ITERATIONS
29
30
31 1
def generate_password_hash(password: str) -> str:
32
    """Generate a salted hash value based on the password."""
33 1
    return _generate_password_hash(password, method=PASSWORD_HASH_METHOD)
34
35
36 1
def create_password_hash(user_id: UserID, password: str) -> None:
37
    """Create a password-based credential and a session token for the user."""
38 1
    password_hash = generate_password_hash(password)
39 1
    now = datetime.utcnow()
40
41 1
    credential = DbCredential(user_id, password_hash, now)
42 1
    db.session.add(credential)
43 1
    db.session.commit()
44
45
46 1
def update_password_hash(
47
    user_id: UserID, password: str, initiator_id: UserID
48
) -> None:
49
    """Update the password hash and set a newly-generated authentication
50
    token for the user.
51
    """
52 1
    credential = _get_credential_for_user(user_id)
53
54 1
    credential.password_hash = generate_password_hash(password)
55 1
    credential.updated_at = datetime.utcnow()
56
57 1
    event = user_event_service.build_event(
58
        'password-updated',
59
        user_id,
60
        {
61
            'initiator_id': str(initiator_id),
62
        },
63
    )
64 1
    db.session.add(event)
65
66 1
    db.session.commit()
67
68 1
    session_service.delete_session_tokens_for_user(user_id)
69
70
71 1
def is_password_valid_for_user(user_id: UserID, password: str) -> bool:
72
    """Return `True` if the password is valid for the user, or `False`
73
    otherwise.
74
    """
75 1
    credential = _find_credential_for_user(user_id)
76
77 1
    if credential is None:
78
        # no password stored for user
79
        return False
80
81 1
    return check_password_hash(credential.password_hash, password)
82
83
84 1
def check_password_hash(password_hash: str, password: str) -> bool:
85
    """Hash the password and return `True` if the result matches the
86
    given hash, `False` otherwise.
87
    """
88 1
    return (password_hash is not None) and _check_password_hash(
89
        password_hash, password
90
    )
91
92
93 1
def migrate_password_hash_if_outdated(user_id: UserID, password: str) -> None:
94
    """Recreate the password hash with the current algorithm and parameters."""
95 1
    credential = _get_credential_for_user(user_id)
96
97 1
    if is_password_hash_current(credential.password_hash):
98 1
        return
99
100
    credential.password_hash = generate_password_hash(password)
101
    credential.updated_at = datetime.utcnow()
102
    db.session.commit()
103
104
105 1
def is_password_hash_current(password_hash: str) -> bool:
106
    """Return `True` if the password hash was created with the currently
107
    configured method (algorithm and parameters).
108
    """
109 1
    return password_hash.startswith(PASSWORD_HASH_METHOD + '$')
110
111
112 1
def _find_credential_for_user(user_id: UserID) -> Optional[DbCredential]:
113
    """Return the credential for the user, if found."""
114 1
    return db.session.query(DbCredential).get(user_id)
115
116
117 1
def _get_credential_for_user(user_id: UserID) -> DbCredential:
118
    """Return the credential for the user, or raise exception if not found."""
119 1
    credential = _find_credential_for_user(user_id)
120
121 1
    if credential is None:
122
        raise Exception(f'No credential found for user ID "{user_id}"')
123
124 1
    return credential
125
126
127 1
def delete_password_hash(user_id: UserID) -> None:
128
    """Delete user's credentials."""
129 1
    db.session.query(DbCredential) \
130
        .filter_by(user_id=user_id) \
131
        .delete()
132
133
    db.session.commit()
134