Passed
Push — main ( 7e9c56...8b0189 )
by Jochen
03:34
created

byceps.services.user.creation_service   A

Complexity

Total Complexity 24

Size/Duplication

Total Lines 219
Duplicated Lines 0 %

Test Coverage

Coverage 89.16%

Importance

Changes 0
Metric Value
eloc 136
dl 0
loc 219
ccs 74
cts 83
cp 0.8916
rs 10
c 0
b 0
f 0
wmc 24

7 Functions

Rating   Name   Duplication   Size   Complexity  
A _normalize_email_address() 0 8 4
A _normalize_screen_name() 0 8 4
B _create_user() 0 48 6
A create_basic_user() 0 22 1
A build_user() 0 20 3
A request_email_address_confirmation() 0 10 1
B create_user() 0 53 5
1
"""
2
byceps.services.user.creation_service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2020 Jochen Kupperschmidt
6
:License: Modified BSD, see LICENSE for details.
7
"""
8
9 1
from datetime import datetime
10 1
from typing import Optional, Set, Tuple
11
12 1
from flask import current_app
13
14 1
from ...database import db
15 1
from ...events.user import UserAccountCreated
16 1
from ...typing import UserID
17
18 1
from ..authentication.password import service as password_service
19 1
from ..consent import consent_service
20 1
from ..consent.transfer.models import Consent
21 1
from ..newsletter import command_service as newsletter_command_service
22 1
from ..newsletter.transfer.models import Subscription as NewsletterSubscription
23 1
from ..site.transfer.models import SiteID
24
25 1
from . import email_address_verification_service
26 1
from . import event_service
27 1
from .models.detail import UserDetail as DbUserDetail
28 1
from .models.user import User as DbUser
29 1
from . import service as user_service
30 1
from .transfer.models import User
31
32
33 1
class UserCreationFailed(Exception):
34 1
    pass
35
36
37 1
def create_user(
38
    screen_name: str,
39
    email_address: str,
40
    password: str,
41
    first_names: Optional[str],
42
    last_name: Optional[str],
43
    site_id: SiteID,
44
    *,
45
    terms_consent: Optional[Consent] = None,
46
    consents: Set[Consent] = None,
47
    newsletter_subscription: Optional[NewsletterSubscription] = None,
48
) -> Tuple[User, UserAccountCreated]:
49
    """Create a user account and related records."""
50
    # user with details, password, and roles
51 1
    user, event = create_basic_user(
52
        screen_name,
53
        email_address,
54
        password,
55
        first_names=first_names,
56
        last_name=last_name,
57
    )
58
59
    # consent to terms of service
60 1
    if terms_consent:
61 1
        terms_consent = consent_service.build_consent(
62
            user.id, terms_consent.subject_id, terms_consent.expressed_at
63
        )
64 1
        db.session.add(terms_consent)
65
66
    # consents
67 1
    if consents:
68 1
        for consent in consents:
69
            # Insert missing user ID.
70 1
            consent = consent_service.build_consent(
71
                user.id,
72
                consent.subject_id,
73
                consent.expressed_at,
74
            )
75 1
            db.session.add(consent)
76
77 1
    db.session.commit()
78
79
    # newsletter subscription (optional)
80 1
    if newsletter_subscription:
81 1
        newsletter_command_service.subscribe(
82
            user.id,
83
            newsletter_subscription.list_id,
84
            newsletter_subscription.expressed_at,
85
        )
86
87 1
    request_email_address_confirmation(user, email_address, site_id)
88
89 1
    return user, event
90
91
92 1
def create_basic_user(
93
    screen_name: str,
94
    email_address: Optional[str],
95
    password: str,
96
    *,
97
    first_names: Optional[str] = None,
98
    last_name: Optional[str] = None,
99
    creator_id: Optional[UserID] = None,
100
) -> Tuple[User, UserAccountCreated]:
101
    # user with details
102 1
    user, event = _create_user(
103
        screen_name,
104
        email_address,
105
        first_names=first_names,
106
        last_name=last_name,
107
        creator_id=creator_id,
108
    )
109
110
    # password
111 1
    password_service.create_password_hash(user.id, password)
112
113 1
    return user, event
114
115
116 1
def _create_user(
117
    screen_name: Optional[str],
118
    email_address: Optional[str],
119
    *,
120
    first_names: Optional[str] = None,
121
    last_name: Optional[str] = None,
122
    creator_id: Optional[UserID] = None,
123
) -> Tuple[User, UserAccountCreated]:
124 1
    if creator_id is not None:
125
        creator = user_service.get_user(creator_id)
126
    else:
127 1
        creator = None
128
129 1
    created_at = datetime.utcnow()
130
131 1
    user = build_user(created_at, screen_name, email_address)
132
133 1
    user.detail.first_names = first_names
134 1
    user.detail.last_name = last_name
135
136 1
    db.session.add(user)
137
138 1
    try:
139 1
        db.session.commit()
140
    except Exception as e:
141
        current_app.logger.error('User creation failed: %s', e)
142
        db.session.rollback()
143
        raise UserCreationFailed()
144
145
    # Create event in separate step as user ID is not available earlier.
146 1
    event_data = {}
147 1
    if creator is not None:
148
        event_data['initiator_id'] = str(creator.id)
149 1
    event_service.create_event(
150
        'user-created', user.id, event_data, occurred_at=created_at
151
    )
152
153 1
    user_dto = user_service._db_entity_to_user(user)
154
155 1
    event = UserAccountCreated(
156
        occurred_at=user.created_at,
157
        initiator_id=creator.id if creator else None,
158
        initiator_screen_name=creator.screen_name if creator else None,
159
        user_id=user.id,
160
        user_screen_name=user.screen_name,
161
    )
162
163 1
    return user_dto, event
164
165
166 1
def build_user(
167
    created_at: datetime,
168
    screen_name: Optional[str],
169
    email_address: Optional[str],
170
) -> DbUser:
171 1
    if screen_name is not None:
172 1
        normalized_screen_name = _normalize_screen_name(screen_name)
173
    else:
174 1
        normalized_screen_name = None
175
176 1
    if email_address is not None:
177 1
        normalized_email_address = _normalize_email_address(email_address)
178
    else:
179
        normalized_email_address = None
180
181 1
    user = DbUser(created_at, normalized_screen_name, normalized_email_address)
182
183 1
    detail = DbUserDetail(user=user)
184
185 1
    return user
186
187
188 1
def request_email_address_confirmation(
189
    user: User, email_address: str, site_id: SiteID
190
) -> None:
191
    """Send an e-mail to the user to request confirmation of the e-mail
192
    address.
193
    """
194 1
    normalized_email_address = _normalize_email_address(email_address)
195
196 1
    email_address_verification_service.send_email_address_confirmation_email(
197
        normalized_email_address, user.screen_name, user.id, site_id
198
    )
199
200
201 1
def _normalize_screen_name(screen_name: str) -> str:
202
    """Normalize the screen name, or raise an exception if invalid."""
203 1
    normalized = screen_name.strip()
204
205 1
    if not normalized or (' ' in normalized) or ('@' in normalized):
206
        raise ValueError(f"Invalid screen name: '{screen_name}'")
207
208 1
    return normalized
209
210
211 1
def _normalize_email_address(email_address: str) -> str:
212
    """Normalize the e-mail address, or raise an exception if invalid."""
213 1
    normalized = email_address.strip().lower()
214
215 1
    if not normalized or (' ' in normalized) or ('@' not in normalized):
216
        raise ValueError(f"Invalid email address: '{email_address}'")
217
218
    return normalized
219