Passed
Push — main ( 7e9c56...8b0189 )
by Jochen
03:34
created

create_user()   B

Complexity

Conditions 5

Size

Total Lines 53
Code Lines 36

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 14
CRAP Score 5

Importance

Changes 0
Metric Value
cc 5
eloc 36
nop 10
dl 0
loc 53
ccs 14
cts 14
cp 1
crap 5
rs 8.5493
c 0
b 0
f 0

How to fix   Long Method    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
"""
2
byceps.services.user.creation_service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2020 Jochen Kupperschmidt
6
:License: Modified BSD, see LICENSE for details.
7
"""
8
9 1
from datetime import datetime
10 1
from typing import Optional, Set, Tuple
11
12 1
from flask import current_app
13
14 1
from ...database import db
15 1
from ...events.user import UserAccountCreated
16 1
from ...typing import UserID
17
18 1
from ..authentication.password import service as password_service
19 1
from ..consent import consent_service
20 1
from ..consent.transfer.models import Consent
21 1
from ..newsletter import command_service as newsletter_command_service
22 1
from ..newsletter.transfer.models import Subscription as NewsletterSubscription
23 1
from ..site.transfer.models import SiteID
24
25 1
from . import email_address_verification_service
26 1
from . import event_service
27 1
from .models.detail import UserDetail as DbUserDetail
28 1
from .models.user import User as DbUser
29 1
from . import service as user_service
30 1
from .transfer.models import User
31
32
33 1
class UserCreationFailed(Exception):
34 1
    pass
35
36
37 1
def create_user(
38
    screen_name: str,
39
    email_address: str,
40
    password: str,
41
    first_names: Optional[str],
42
    last_name: Optional[str],
43
    site_id: SiteID,
44
    *,
45
    terms_consent: Optional[Consent] = None,
46
    consents: Set[Consent] = None,
47
    newsletter_subscription: Optional[NewsletterSubscription] = None,
48
) -> Tuple[User, UserAccountCreated]:
49
    """Create a user account and related records."""
50
    # user with details, password, and roles
51 1
    user, event = create_basic_user(
52
        screen_name,
53
        email_address,
54
        password,
55
        first_names=first_names,
56
        last_name=last_name,
57
    )
58
59
    # consent to terms of service
60 1
    if terms_consent:
61 1
        terms_consent = consent_service.build_consent(
62
            user.id, terms_consent.subject_id, terms_consent.expressed_at
63
        )
64 1
        db.session.add(terms_consent)
65
66
    # consents
67 1
    if consents:
68 1
        for consent in consents:
69
            # Insert missing user ID.
70 1
            consent = consent_service.build_consent(
71
                user.id,
72
                consent.subject_id,
73
                consent.expressed_at,
74
            )
75 1
            db.session.add(consent)
76
77 1
    db.session.commit()
78
79
    # newsletter subscription (optional)
80 1
    if newsletter_subscription:
81 1
        newsletter_command_service.subscribe(
82
            user.id,
83
            newsletter_subscription.list_id,
84
            newsletter_subscription.expressed_at,
85
        )
86
87 1
    request_email_address_confirmation(user, email_address, site_id)
88
89 1
    return user, event
90
91
92 1
def create_basic_user(
93
    screen_name: str,
94
    email_address: Optional[str],
95
    password: str,
96
    *,
97
    first_names: Optional[str] = None,
98
    last_name: Optional[str] = None,
99
    creator_id: Optional[UserID] = None,
100
) -> Tuple[User, UserAccountCreated]:
101
    # user with details
102 1
    user, event = _create_user(
103
        screen_name,
104
        email_address,
105
        first_names=first_names,
106
        last_name=last_name,
107
        creator_id=creator_id,
108
    )
109
110
    # password
111 1
    password_service.create_password_hash(user.id, password)
112
113 1
    return user, event
114
115
116 1
def _create_user(
117
    screen_name: Optional[str],
118
    email_address: Optional[str],
119
    *,
120
    first_names: Optional[str] = None,
121
    last_name: Optional[str] = None,
122
    creator_id: Optional[UserID] = None,
123
) -> Tuple[User, UserAccountCreated]:
124 1
    if creator_id is not None:
125
        creator = user_service.get_user(creator_id)
126
    else:
127 1
        creator = None
128
129 1
    created_at = datetime.utcnow()
130
131 1
    user = build_user(created_at, screen_name, email_address)
132
133 1
    user.detail.first_names = first_names
134 1
    user.detail.last_name = last_name
135
136 1
    db.session.add(user)
137
138 1
    try:
139 1
        db.session.commit()
140
    except Exception as e:
141
        current_app.logger.error('User creation failed: %s', e)
142
        db.session.rollback()
143
        raise UserCreationFailed()
144
145
    # Create event in separate step as user ID is not available earlier.
146 1
    event_data = {}
147 1
    if creator is not None:
148
        event_data['initiator_id'] = str(creator.id)
149 1
    event_service.create_event(
150
        'user-created', user.id, event_data, occurred_at=created_at
151
    )
152
153 1
    user_dto = user_service._db_entity_to_user(user)
154
155 1
    event = UserAccountCreated(
156
        occurred_at=user.created_at,
157
        initiator_id=creator.id if creator else None,
158
        initiator_screen_name=creator.screen_name if creator else None,
159
        user_id=user.id,
160
        user_screen_name=user.screen_name,
161
    )
162
163 1
    return user_dto, event
164
165
166 1
def build_user(
167
    created_at: datetime,
168
    screen_name: Optional[str],
169
    email_address: Optional[str],
170
) -> DbUser:
171 1
    if screen_name is not None:
172 1
        normalized_screen_name = _normalize_screen_name(screen_name)
173
    else:
174 1
        normalized_screen_name = None
175
176 1
    if email_address is not None:
177 1
        normalized_email_address = _normalize_email_address(email_address)
178
    else:
179
        normalized_email_address = None
180
181 1
    user = DbUser(created_at, normalized_screen_name, normalized_email_address)
182
183 1
    detail = DbUserDetail(user=user)
184
185 1
    return user
186
187
188 1
def request_email_address_confirmation(
189
    user: User, email_address: str, site_id: SiteID
190
) -> None:
191
    """Send an e-mail to the user to request confirmation of the e-mail
192
    address.
193
    """
194 1
    normalized_email_address = _normalize_email_address(email_address)
195
196 1
    email_address_verification_service.send_email_address_confirmation_email(
197
        normalized_email_address, user.screen_name, user.id, site_id
198
    )
199
200
201 1
def _normalize_screen_name(screen_name: str) -> str:
202
    """Normalize the screen name, or raise an exception if invalid."""
203 1
    normalized = screen_name.strip()
204
205 1
    if not normalized or (' ' in normalized) or ('@' in normalized):
206
        raise ValueError(f"Invalid screen name: '{screen_name}'")
207
208 1
    return normalized
209
210
211 1
def _normalize_email_address(email_address: str) -> str:
212
    """Normalize the e-mail address, or raise an exception if invalid."""
213 1
    normalized = email_address.strip().lower()
214
215 1
    if not normalized or (' ' in normalized) or ('@' not in normalized):
216
        raise ValueError(f"Invalid email address: '{email_address}'")
217
218
    return normalized
219