Passed
Push — main ( ae658f...ebd94b )
by Jochen
04:48
created

request_email_address_confirmation()   A

Complexity

Conditions 2

Size

Total Lines 11
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 2

Importance

Changes 0
Metric Value
eloc 6
dl 0
loc 11
rs 10
c 0
b 0
f 0
cc 2
nop 3
ccs 4
cts 4
cp 1
crap 2
1
"""
2
byceps.services.user.creation_service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2021 Jochen Kupperschmidt
6
:License: Revised BSD (see `LICENSE` file for details)
7
"""
8
9 1
from __future__ import annotations
10 1
from datetime import date, datetime
11 1
from typing import Any, Optional
12
13 1
from flask import current_app
14
15 1
from ...database import db
16 1
from ...events.user import UserAccountCreated
17 1
from ...typing import UserID
18
19 1
from ..authentication.password import service as password_service
20 1
from ..site.transfer.models import SiteID
21
22 1
from . import email_address_verification_service
23 1
from . import event_service
24 1
from .dbmodels.detail import UserDetail as DbUserDetail
25 1
from .dbmodels.user import User as DbUser
26 1
from . import service as user_service
27 1
from .transfer.models import User
28
29
30 1
class UserCreationFailed(Exception):
31 1
    pass
32
33
34 1
def create_user(
35
    screen_name: Optional[str],
36
    email_address: Optional[str],
37
    password: str,
38
    *,
39
    first_names: Optional[str] = None,
40
    last_name: Optional[str] = None,
41
    date_of_birth: Optional[date] = None,
42
    country: Optional[str] = None,
43
    zip_code: Optional[str] = None,
44
    city: Optional[str] = None,
45
    street: Optional[str] = None,
46
    phone_number: Optional[str] = None,
47
    internal_comment: Optional[str] = None,
48
    extras: Optional[dict[str, Any]] = None,
49
    creator_id: Optional[UserID] = None,
50
    site_id: Optional[SiteID] = None,
51
) -> tuple[User, UserAccountCreated]:
52
    """Create a user account and related records."""
53
    creator: Optional[User]
54 1
    if creator_id is not None:
55
        creator = user_service.get_user(creator_id)
56
    else:
57 1
        creator = None
58
59 1
    created_at = datetime.utcnow()
60
61 1
    db_user = build_db_user(created_at, screen_name, email_address)
62
63 1
    db_user.detail.first_names = first_names
64 1
    db_user.detail.last_name = last_name
65 1
    db_user.detail.date_of_birth = date_of_birth
66 1
    db_user.detail.country = country
67 1
    db_user.detail.zip_code = zip_code
68 1
    db_user.detail.city = city
69 1
    db_user.detail.street = street
70 1
    db_user.detail.phone_number = phone_number
71 1
    db_user.detail.internal_comment = internal_comment
72 1
    db_user.detail.extras = extras
73
74 1
    db.session.add(db_user)
75
76 1
    try:
77 1
        db.session.commit()
78
    except Exception as e:
79
        current_app.logger.error('User creation failed: %s', e)
80
        db.session.rollback()
81
        raise UserCreationFailed()
82
83 1
    user = user_service._db_entity_to_user(db_user)
84
85
    # Create event in separate step as user ID is not available earlier.
86 1
    event_data = {}
87 1
    if creator is not None:
88
        event_data['initiator_id'] = str(creator.id)
89 1
    if site_id is not None:
90 1
        event_data['site_id'] = site_id
91 1
    event_service.create_event(
92
        'user-created', user.id, event_data, occurred_at=created_at
93
    )
94
95 1
    event = UserAccountCreated(
96
        occurred_at=db_user.created_at,
97
        initiator_id=creator.id if creator else None,
98
        initiator_screen_name=creator.screen_name if creator else None,
99
        user_id=user.id,
100
        user_screen_name=user.screen_name,
101
        site_id=site_id,
102
    )
103
104
    # password
105 1
    password_service.create_password_hash(user.id, password)
106
107 1
    return user, event
108
109
110 1
def build_db_user(
111
    created_at: datetime,
112
    screen_name: Optional[str],
113
    email_address: Optional[str],
114
) -> DbUser:
115
    normalized_screen_name: Optional[str]
116 1
    if screen_name is not None:
117 1
        normalized_screen_name = _normalize_screen_name(screen_name)
118
    else:
119 1
        normalized_screen_name = None
120
121
    normalized_email_address: Optional[str]
122 1
    if email_address is not None:
123 1
        normalized_email_address = _normalize_email_address(email_address)
124
    else:
125
        normalized_email_address = None
126
127 1
    user = DbUser(created_at, normalized_screen_name, normalized_email_address)
128
129 1
    detail = DbUserDetail(user=user)
130
131 1
    return user
132
133
134 1
def request_email_address_confirmation(
135
    user: User, email_address: str, site_id: SiteID
136
) -> None:
137
    """Send an e-mail to the user to request confirmation of the e-mail
138
    address.
139
    """
140 1
    normalized_email_address = _normalize_email_address(email_address)
141 1
    screen_name = user.screen_name if user.screen_name else 'UnknownUser'
142
143 1
    email_address_verification_service.send_email_address_confirmation_email(
144
        normalized_email_address, screen_name, user.id, site_id
145
    )
146
147
148 1
def _normalize_screen_name(screen_name: str) -> str:
149
    """Normalize the screen name, or raise an exception if invalid."""
150 1
    normalized = screen_name.strip()
151
152 1
    if not normalized or (' ' in normalized) or ('@' in normalized):
153
        raise ValueError(f"Invalid screen name: '{screen_name}'")
154
155 1
    return normalized
156
157
158 1
def _normalize_email_address(email_address: str) -> str:
159
    """Normalize the e-mail address, or raise an exception if invalid."""
160 1
    normalized = email_address.strip().lower()
161
162 1
    if not normalized or (' ' in normalized) or ('@' not in normalized):
163
        raise ValueError(f"Invalid email address: '{email_address}'")
164
165
    return normalized
166