Passed
Push — master ( 74655b...84d640 )
by Jochen
02:20
created

byceps.blueprints.authentication.views   B

Complexity

Total Complexity 44

Size/Duplication

Total Lines 327
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 188
dl 0
loc 327
rs 8.8798
c 0
b 0
f 0
wmc 44

15 Functions

Rating   Name   Duplication   Size   Complexity  
A password_update() 0 16 2
A password_update_form() 0 9 2
A _get_required_consent_subject_ids() 0 17 3
A login_form() 0 26 4
A _is_consent_required() 0 8 3
A password_reset_form() 0 15 2
A _is_login_allowed() 0 3 1
C login() 0 53 10
B request_password_reset() 0 35 5
A password_reset() 0 19 2
A request_password_reset_form() 0 7 2
A logout() 0 6 1
A _verify_password_reset_token() 0 7 3
A _is_user_account_creation_enabled() 0 6 2
A _get_current_user_or_404() 0 6 2

How to fix   Complexity   

Complexity

Complex classes like byceps.blueprints.authentication.views often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
byceps.blueprints.authentication.views
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2019 Jochen Kupperschmidt
6
:License: Modified BSD, see LICENSE for details.
7
"""
8
9
from flask import abort, g, request, url_for
10
11
from ...config import get_site_mode
12
from ...services.authentication.exceptions import AuthenticationFailed
13
from ...services.authentication import service as authentication_service
14
from ...services.authentication.password import service as password_service
15
from ...services.authentication.password import (
16
    reset_service as password_reset_service,
17
)
18
from ...services.authentication.session import service as session_service
19
from ...services.consent import consent_service
20
from ...services.email import service as email_service
21
from ...services.site import (
22
    service as site_service,
23
    settings_service as site_settings_service,
24
)
25
from ...services.terms import version_service as terms_version_service
26
from ...services.user import service as user_service
27
from ...services.verification_token import service as verification_token_service
28
from ...util.framework.blueprint import create_blueprint
29
from ...util.framework.flash import flash_error, flash_notice, flash_success
30
from ...util.framework.templating import templated
31
from ...util.views import redirect_to, respond_no_content
32
33
from ..admin.core.authorization import AdminPermission
34
from ..user.creation.views import _find_privacy_policy_consent_subject_id
35
36
from .forms import (
37
    LoginForm,
38
    RequestPasswordResetForm,
39
    ResetPasswordForm,
40
    UpdatePasswordForm,
41
)
42
from . import service, session as user_session
43
44
45
blueprint = create_blueprint('authentication', __name__)
46
47
48
# -------------------------------------------------------------------- #
49
# log in/out
50
51
52
@blueprint.route('/login')
53
@templated
54
def login_form():
55
    """Show login form."""
56
    logged_in = g.current_user.is_active
57
    if logged_in:
58
        flash_notice(
59
            f'Du bist bereits als Benutzer "{g.current_user.screen_name}" '
60
            'angemeldet.'
61
        )
62
63
    in_admin_mode = get_site_mode().is_admin()
64
65
    if not in_admin_mode and not _is_login_allowed():
66
        return {
67
            'login_enabled': False,
68
        }
69
70
    form = LoginForm()
71
    user_account_creation_enabled = _is_user_account_creation_enabled(in_admin_mode)
72
73
    return {
74
        'login_enabled': True,
75
        'logged_in': logged_in,
76
        'form': form,
77
        'user_account_creation_enabled': user_account_creation_enabled,
78
    }
79
80
81
@blueprint.route('/login', methods=['POST'])
82
@respond_no_content
83
def login():
84
    """Allow the user to authenticate with e-mail address and password."""
85
    if g.current_user.is_active:
86
        return
87
88
    in_admin_mode = get_site_mode().is_admin()
89
90
    if not in_admin_mode and not _is_login_allowed():
91
        abort(403, 'Log in to this site is generally disabled.')
92
93
    form = LoginForm(request.form)
94
95
    screen_name = form.screen_name.data.strip()
96
    password = form.password.data
97
    permanent = form.permanent.data
98
    if not all([screen_name, password]):
99
        abort(403)
100
101
    try:
102
        user = authentication_service.authenticate(screen_name, password)
103
    except AuthenticationFailed:
104
        abort(403)
105
106
    if in_admin_mode:
107
        permissions = service.get_permissions_for_user(user.id)
108
        if AdminPermission.access not in permissions:
109
            # The user lacks the admin access permission which is required
110
            # to enter the admin area.
111
            abort(403)
112
113
    if not in_admin_mode:
114
        required_consent_subject_ids = _get_required_consent_subject_ids()
115
        if _is_consent_required(user.id, required_consent_subject_ids):
116
            verification_token = verification_token_service.create_for_terms_consent(
117
                user.id
118
            )
119
120
            consent_form_url = url_for(
121
                'consent.consent_form', token=verification_token.token
122
            )
123
124
            return [('Location', consent_form_url)]
125
126
    # Authorization succeeded.
127
128
    session_token = session_service.get_session_token(user.id)
129
130
    service.create_login_event(user.id, request.remote_addr)
131
132
    user_session.start(user.id, session_token.token, permanent=permanent)
133
    flash_success(f'Erfolgreich eingeloggt als {user.screen_name}.')
134
135
136
def _is_login_allowed():
137
    value = site_settings_service.find_setting_value(g.site_id, 'login_enabled')
138
    return value != 'false'
139
140
141
def _get_required_consent_subject_ids():
142
    subject_ids = []
143
144
    terms_version = terms_version_service.find_current_version_for_brand(
145
        g.brand_id
146
    )
147
    if terms_version:
148
        subject_ids.append(terms_version.consent_subject_id)
149
150
    privacy_policy_consent_subject_id = (
151
        _find_privacy_policy_consent_subject_id()
152
    )
153
154
    if privacy_policy_consent_subject_id:
155
        subject_ids.append(privacy_policy_consent_subject_id)
156
157
    return subject_ids
158
159
160
def _is_consent_required(user_id, subject_ids):
161
    for subject_id in subject_ids:
162
        if not consent_service.has_user_consented_to_subject(
163
            user_id, subject_id
164
        ):
165
            return True
166
167
    return False
168
169
170
@blueprint.route('/logout', methods=['POST'])
171
@respond_no_content
172
def logout():
173
    """Log out user by deleting the corresponding cookie."""
174
    user_session.end()
175
    flash_success('Erfolgreich ausgeloggt.')
176
177
178
# -------------------------------------------------------------------- #
179
# password update
180
181
182
@blueprint.route('/password/update')
183
@templated
184
def password_update_form(erroneous_form=None):
185
    """Show a form to update the current user's password."""
186
    _get_current_user_or_404()
187
188
    form = erroneous_form if erroneous_form else UpdatePasswordForm()
189
190
    return {'form': form}
191
192
193
@blueprint.route('/password', methods=['POST'])
194
def password_update():
195
    """Update the current user's password."""
196
    user = _get_current_user_or_404()
197
198
    form = UpdatePasswordForm(request.form)
199
200
    if not form.validate():
201
        return password_update_form(form)
202
203
    password = form.new_password.data
204
205
    password_service.update_password_hash(user.id, password, user.id)
206
207
    flash_success('Dein Passwort wurde geändert. Bitte melde dich erneut an.')
208
    return redirect_to('.login_form')
209
210
211
# -------------------------------------------------------------------- #
212
# password reset
213
214
215
@blueprint.route('/password/reset/request')
216
@templated
217
def request_password_reset_form(erroneous_form=None):
218
    """Show a form to request a password reset."""
219
    form = erroneous_form if erroneous_form else RequestPasswordResetForm()
220
221
    return {'form': form}
222
223
224
@blueprint.route('/password/reset/request', methods=['POST'])
225
def request_password_reset():
226
    """Request a password reset."""
227
    form = RequestPasswordResetForm(request.form)
228
    if not form.validate():
229
        return request_password_reset_form(form)
230
231
    screen_name = form.screen_name.data.strip()
232
    user = user_service.find_user_by_screen_name(screen_name)
233
234
    if user is None:
235
        flash_error(f'Der Benutzername "{screen_name}" ist unbekannt.')
236
        return request_password_reset_form(form)
237
238
    if not user.email_address_verified:
239
        flash_error(
240
            f'Die E-Mail-Adresse für das Benutzerkonto "{screen_name}" '
241
            'wurde noch nicht bestätigt.'
242
        )
243
        return redirect_to('user_email_address.request_confirmation_email')
244
245
    sender = None
246
    if get_site_mode().is_public():
247
        site = site_service.get_site(g.site_id)
248
        email_config = email_service.get_config(site.email_config_id)
249
        sender = email_config.sender
250
251
    password_reset_service.prepare_password_reset(user, sender=sender)
252
253
    flash_success(
254
        'Ein Link zum Setzen eines neuen Passworts '
255
        f'für den Benutzernamen "{user.screen_name}" '
256
        'wurde an die hinterlegte E-Mail-Adresse versendet.'
257
    )
258
    return request_password_reset_form()
259
260
261
@blueprint.route('/password/reset/token/<token>')
262
@templated
263
def password_reset_form(token, erroneous_form=None):
264
    """Show a form to reset the current user's password."""
265
    verification_token = verification_token_service.find_for_password_reset_by_token(
266
        token
267
    )
268
269
    _verify_password_reset_token(verification_token)
270
271
    form = erroneous_form if erroneous_form else ResetPasswordForm()
272
273
    return {
274
        'form': form,
275
        'token': token,
276
    }
277
278
279
@blueprint.route('/password/reset/token/<token>', methods=['POST'])
280
def password_reset(token):
281
    """Reset the current user's password."""
282
    verification_token = verification_token_service.find_for_password_reset_by_token(
283
        token
284
    )
285
286
    _verify_password_reset_token(verification_token)
287
288
    form = ResetPasswordForm(request.form)
289
    if not form.validate():
290
        return password_reset_form(token, form)
291
292
    password = form.new_password.data
293
294
    password_reset_service.reset_password(verification_token, password)
295
296
    flash_success('Das Passwort wurde geändert.')
297
    return redirect_to('.login_form')
298
299
300
def _verify_password_reset_token(verification_token):
301
    if verification_token is None or verification_token.is_expired:
302
        flash_error(
303
            'Es wurde kein gültiges Token angegeben. '
304
            'Ein Token ist nur 24 Stunden lang gültig.'
305
        )
306
        abort(404)
307
308
309
# -------------------------------------------------------------------- #
310
# helpers
311
312
313
def _is_user_account_creation_enabled(in_admin_mode):
314
    if in_admin_mode:
315
        return False
316
317
    site = site_service.get_site(g.site_id)
318
    return site.user_account_creation_enabled
319
320
321
def _get_current_user_or_404():
322
    user = g.current_user
323
    if not user.is_active:
324
        abort(404)
325
326
    return user
327