byceps.blueprints.admin.user.views   F
last analyzed

Complexity

Total Complexity 66

Size/Duplication

Total Lines 893
Duplicated Lines 12.54 %

Test Coverage

Coverage 55.05%

Importance

Changes 0
Metric Value
eloc 561
dl 112
loc 893
ccs 207
cts 376
cp 0.5505
rs 3.12
c 0
b 0
f 0
wmc 66

31 Functions

Rating   Name   Duplication   Size   Complexity  
A delete_account_form() 0 23 3
A change_email_address() 0 29 2
A change_screen_name_form() 0 13 2
A invalidate_email_address_form() 0 18 3
A change_email_address_form() 0 13 2
A delete_account() 0 34 3
A change_screen_name() 0 30 2
A change_details_form() 0 17 2
A _calculate_days_since() 0 5 2
A unsuspend_account() 34 34 3
A change_details() 0 38 2
A _get_user_for_admin_or_404() 0 7 2
B create_account() 0 65 5
B view() 0 51 1
A set_password() 0 27 2
A role_deassign() 0 20 1
A _get_role_or_404() 0 7 2
A view_events() 0 22 2
A manage_roles() 0 18 1
A invalidate_email_address() 0 33 3
A set_password_form() 0 13 2
A role_assign() 0 21 2
A initialize_account() 0 15 1
A unsuspend_account_form() 22 22 3
A suspend_account_form() 22 22 3
A index() 0 35 1
A create_account_form() 0 9 2
A _get_user_or_404() 0 7 2
A view_permissions() 0 19 1
A _index_permissions_by_role() 0 15 1
A suspend_account() 34 34 3

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like byceps.blueprints.admin.user.views often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
byceps.blueprints.admin.user.views
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2014-2023 Jochen Kupperschmidt
6
:License: Revised BSD (see `LICENSE` file for details)
7
"""
8
9 1
from __future__ import annotations
10
11 1
from datetime import datetime
12
13 1
from flask import abort, g, request
14 1
from flask_babel import gettext
15
16 1
from byceps.services.authn.password import authn_password_service
17 1
from byceps.services.authn.session import authn_session_service
18 1
from byceps.services.authz import authz_service
19 1
from byceps.services.authz.models import Permission, PermissionID, Role
20 1
from byceps.services.country import country_service
21 1
from byceps.services.orga_team import orga_team_service
22 1
from byceps.services.shop.order import order_service
23 1
from byceps.services.shop.shop import shop_service
24 1
from byceps.services.site import site_service
25 1
from byceps.services.user import (
26
    user_command_service,
27
    user_creation_service,
28
    user_deletion_service,
29
    user_email_address_service,
30
    user_service,
31
    user_stats_service,
32
)
33 1
from byceps.services.user.models.user import UserForAdmin, UserStateFilter
34 1
from byceps.services.user_badge import user_badge_awarding_service
35 1
from byceps.signals import authn as authn_signals
36 1
from byceps.signals import authz as authz_signals
37 1
from byceps.signals import user as user_signals
38 1
from byceps.util.authz import permission_registry
39 1
from byceps.util.framework.blueprint import create_blueprint
40 1
from byceps.util.framework.flash import flash_error, flash_success
41 1
from byceps.util.framework.templating import templated
42 1
from byceps.util.views import (
43
    permission_required,
44
    redirect_to,
45
    respond_no_content,
46
)
47
48 1
from . import service
49 1
from .forms import (
50
    ACCOUNT_DELETION_VERIFICATION_TEXT,
51
    ChangeDetailsForm,
52
    ChangeEmailAddressForm,
53
    ChangeScreenNameForm,
54
    CreateAccountForm,
55
    DeleteAccountForm,
56
    InvalidateEmailAddressForm,
57
    SetPasswordForm,
58
    SuspendAccountForm,
59
)
60
61
62 1
blueprint = create_blueprint('user_admin', __name__)
63
64
65 1
@blueprint.get('/', defaults={'page': 1})
66 1
@blueprint.get('/pages/<int:page>')
67 1
@permission_required('user.view')
68 1
@templated
69 1
def index(page):
70
    """List users."""
71 1
    per_page = request.args.get('per_page', type=int, default=20)
72 1
    search_term = request.args.get('search_term', default='').strip()
73 1
    only = request.args.get('only')
74
75 1
    user_state_filter = UserStateFilter.__members__.get(
76
        only, UserStateFilter.none
77
    )
78
79 1
    users = user_service.get_users_paginated(
80
        page, per_page, search_term=search_term, state_filter=user_state_filter
81
    )
82
83 1
    total_active = user_stats_service.count_active_users()
84 1
    total_uninitialized = user_stats_service.count_uninitialized_users()
85 1
    total_suspended = user_stats_service.count_suspended_users()
86 1
    total_deleted = user_stats_service.count_deleted_users()
87 1
    total_overall = user_stats_service.count_users()
88
89 1
    return {
90
        'users': users,
91
        'total_active': total_active,
92
        'total_uninitialized': total_uninitialized,
93
        'total_suspended': total_suspended,
94
        'total_deleted': total_deleted,
95
        'total_overall': total_overall,
96
        'search_term': search_term,
97
        'only': only,
98
        'UserStateFilter': UserStateFilter,
99
        'user_state_filter': user_state_filter,
100
    }
101
102
103 1
@blueprint.get('/<uuid:user_id>')
104 1
@permission_required('user.view')
105 1
@templated
106 1
def view(user_id):
107
    """Show a user's interal profile."""
108 1
    user = _get_user_for_admin_or_404(user_id)
109 1
    db_user = user_service.find_user_with_details(user.id)
110
111 1
    recent_login = authn_session_service.find_recent_login(user.id)
112 1
    days_since_recent_login = _calculate_days_since(recent_login)
113
114 1
    orga_activities = orga_team_service.get_orga_activities_for_user(user.id)
115
116 1
    newsletter_subscription_states = list(
117
        service.get_newsletter_subscription_states(user.id)
118
    )
119 1
    newsletter_subscription_count = sum(
120
        1 for _, subscribed in newsletter_subscription_states if subscribed
121
    )
122
123 1
    orders = order_service.get_orders_placed_by_user(user.id)
124
125 1
    order_shop_ids = {order.shop_id for order in orders}
126 1
    shops = shop_service.find_shops(order_shop_ids)
127 1
    shops_by_id = {shop.id: shop for shop in shops}
128
129 1
    parties_and_tickets = service.get_parties_and_tickets(user.id)
130 1
    ticket_count = sum(len(tickets) for _, tickets in parties_and_tickets)
131
132 1
    attended_parties = service.get_attended_parties(user.id)
133
134 1
    badges_with_awarding_quantity = (
135
        user_badge_awarding_service.get_badges_awarded_to_user(user.id)
136
    )
137 1
    badge_count = len(badges_with_awarding_quantity)
138
139 1
    return {
140
        'profile_user': user,
141
        'user': db_user,
142
        'recent_login': recent_login,
143
        'days_since_recent_login': days_since_recent_login,
144
        'orga_activities': orga_activities,
145
        'newsletter_subscription_count': newsletter_subscription_count,
146
        'newsletter_subscription_states': newsletter_subscription_states,
147
        'orders': orders,
148
        'shops_by_id': shops_by_id,
149
        'parties_and_tickets': parties_and_tickets,
150
        'ticket_count': ticket_count,
151
        'attended_parties': attended_parties,
152
        'badge_count': badge_count,
153
        'badges_with_awarding_quantity': badges_with_awarding_quantity,
154
    }
155
156
157 1
def _calculate_days_since(dt: datetime | None) -> int | None:
158 1
    if dt is None:
159 1
        return None
160
161
    return (datetime.utcnow().date() - dt.date()).days
162
163
164
# -------------------------------------------------------------------- #
165
# account
166
167
168 1
@blueprint.get('/create')
169 1
@permission_required('user.create')
170 1
@templated
171 1
def create_account_form(erroneous_form=None):
172
    """Show a form to create a user account."""
173 1
    form = erroneous_form if erroneous_form else CreateAccountForm()
174 1
    form.set_site_choices()
175
176 1
    return {'form': form}
177
178
179 1
@blueprint.post('/')
180 1
@permission_required('user.create')
181 1
def create_account():
182
    """Create a user account."""
183
    form = CreateAccountForm(request.form)
184
    form.set_site_choices()
185
186
    if not form.validate():
187
        return create_account_form(form)
188
189
    screen_name = form.screen_name.data.strip()
190
    first_name = form.first_name.data.strip()
191
    last_name = form.last_name.data.strip()
192
    email_address = form.email_address.data.lower()
193
    password = form.password.data
194
    site_id_for_email = form.site_id.data
195
196
    if site_id_for_email:
197
        site_for_email = site_service.get_site(site_id_for_email)
198
    else:
199
        site_for_email = None
200
201
    initiator = g.user
202
203
    creation_result = user_creation_service.create_user(
204
        screen_name,
205
        email_address,
206
        password,
207
        first_name=first_name,
208
        last_name=last_name,
209
        creation_method='admin app',
210
        creator=initiator,
211
        # Do not pass site ID here; the account is not created on a site.
212
        ip_address=request.remote_addr,
213
    )
214
    if creation_result.is_err():
215
        flash_error(
216
            gettext(
217
                'User "%(screen_name)s" could not be created.',
218
                screen_name=screen_name,
219
            )
220
        )
221
        return create_account_form(form)
222
223
    user, event = creation_result.unwrap()
224
225
    flash_success(
226
        gettext(
227
            'User "%(screen_name)s" has been created.',
228
            screen_name=user.screen_name,
229
        )
230
    )
231
232
    if site_for_email:
233
        user_creation_service.request_email_address_confirmation(
234
            user, email_address, site_for_email.id
235
        ).unwrap()
236
        flash_success(
237
            gettext('An email has been sent to the corresponding address.'),
238
            icon='email',
239
        )
240
241
    user_signals.account_created.send(None, event=event)
242
243
    return redirect_to('.view', user_id=user.id)
244
245
246 1
@blueprint.post('/<uuid:user_id>/initialize')
247 1
@permission_required('user.administrate')
248 1
@respond_no_content
249 1
def initialize_account(user_id):
250
    """Initialize the user account."""
251
    user = _get_user_or_404(user_id)
252
253
    initiator = g.user
254
255
    user_command_service.initialize_account(user, initiator=initiator)
256
257
    flash_success(
258
        gettext(
259
            "User '%(screen_name)s' has been initialized.",
260
            screen_name=user.screen_name,
261
        )
262
    )
263
264
265 1 View Code Duplication
@blueprint.get('/<uuid:user_id>/suspend')
266 1
@permission_required('user.administrate')
267 1
@templated
268 1
def suspend_account_form(user_id, erroneous_form=None):
269
    """Show form to suspend the user account."""
270 1
    user = _get_user_for_admin_or_404(user_id)
271
272 1
    if user.suspended:
273 1
        flash_error(
274
            gettext(
275
                "User '%(screen_name)s' is already suspended.",
276
                screen_name=user.screen_name,
277
            )
278
        )
279 1
        return redirect_to('.view', user_id=user.id)
280
281 1
    form = erroneous_form if erroneous_form else SuspendAccountForm()
282
283 1
    return {
284
        'profile_user': user,
285
        'user': user,
286
        'form': form,
287
    }
288
289
290 1 View Code Duplication
@blueprint.post('/<uuid:user_id>/suspend')
291 1
@permission_required('user.administrate')
292 1
def suspend_account(user_id):
293
    """Suspend the user account."""
294
    user = _get_user_or_404(user_id)
295
296
    if user.suspended:
297
        flash_error(
298
            gettext(
299
                "User '%(screen_name)s' is already suspended.",
300
                screen_name=user.screen_name,
301
            )
302
        )
303
        return redirect_to('.view', user_id=user.id)
304
305
    form = SuspendAccountForm(request.form)
306
    if not form.validate():
307
        return suspend_account_form(user.id, form)
308
309
    initiator = g.user
310
    reason = form.reason.data.strip()
311
312
    event = user_command_service.suspend_account(user, initiator, reason)
313
314
    user_signals.account_suspended.send(None, event=event)
315
316
    flash_success(
317
        gettext(
318
            "User '%(screen_name)s' has been suspended.",
319
            screen_name=user.screen_name,
320
        )
321
    )
322
323
    return redirect_to('.view', user_id=user.id)
324
325
326 1 View Code Duplication
@blueprint.get('/<uuid:user_id>/unsuspend')
327 1
@permission_required('user.administrate')
328 1
@templated
329 1
def unsuspend_account_form(user_id, erroneous_form=None):
330
    """Show form to unsuspend the user account."""
331 1
    user = _get_user_for_admin_or_404(user_id)
332
333 1
    if not user.suspended:
334 1
        flash_error(
335
            gettext(
336
                "User '%(screen_name)s' is not suspended.",
337
                screen_name=user.screen_name,
338
            )
339
        )
340 1
        return redirect_to('.view', user_id=user.id)
341
342 1
    form = erroneous_form if erroneous_form else SuspendAccountForm()
343
344 1
    return {
345
        'profile_user': user,
346
        'user': user,
347
        'form': form,
348
    }
349
350
351 1 View Code Duplication
@blueprint.post('/<uuid:user_id>/unsuspend')
352 1
@permission_required('user.administrate')
353 1
def unsuspend_account(user_id):
354
    """Unsuspend the user account."""
355
    user = _get_user_or_404(user_id)
356
357
    if not user.suspended:
358
        flash_error(
359
            gettext(
360
                "User '%(screen_name)s' is not suspended.",
361
                screen_name=user.screen_name,
362
            )
363
        )
364
        return redirect_to('.view', user_id=user.id)
365
366
    form = SuspendAccountForm(request.form)
367
    if not form.validate():
368
        return unsuspend_account_form(user.id, form)
369
370
    initiator = g.user
371
    reason = form.reason.data.strip()
372
373
    event = user_command_service.unsuspend_account(user, initiator, reason)
374
375
    user_signals.account_unsuspended.send(None, event=event)
376
377
    flash_success(
378
        gettext(
379
            "User '%(screen_name)s' has been unsuspended.",
380
            screen_name=user.screen_name,
381
        )
382
    )
383
384
    return redirect_to('.view', user_id=user.id)
385
386
387 1
@blueprint.get('/<uuid:user_id>/delete')
388 1
@permission_required('user.administrate')
389 1
@templated
390 1
def delete_account_form(user_id, erroneous_form=None):
391
    """Show form to delete the user account."""
392 1
    user = _get_user_for_admin_or_404(user_id)
393
394 1
    if user.deleted:
395 1
        flash_error(
396
            gettext(
397
                "User '%(screen_name)s' has already been deleted.",
398
                screen_name=user.screen_name,
399
            )
400
        )
401 1
        return redirect_to('.view', user_id=user.id)
402
403 1
    form = erroneous_form if erroneous_form else DeleteAccountForm()
404
405 1
    return {
406
        'profile_user': user,
407
        'user': user,
408
        'form': form,
409
        'verification_text': ACCOUNT_DELETION_VERIFICATION_TEXT,
410
    }
411
412
413 1
@blueprint.post('/<uuid:user_id>/delete')
414 1
@permission_required('user.administrate')
415 1
def delete_account(user_id):
416
    """Delete the user account."""
417
    user = _get_user_or_404(user_id)
418
419
    if user.deleted:
420
        flash_error(
421
            gettext(
422
                "User '%(screen_name)s' has already been deleted.",
423
                screen_name=user.screen_name,
424
            )
425
        )
426
        return redirect_to('.view', user_id=user.id)
427
428
    form = DeleteAccountForm(request.form)
429
    if not form.validate():
430
        return delete_account_form(user.id, form)
431
432
    initiator = g.user
433
    reason = form.reason.data.strip()
434
435
    event = user_deletion_service.delete_account(user, initiator, reason)
436
437
    user_signals.account_deleted.send(None, event=event)
438
439
    flash_success(
440
        gettext(
441
            "User '%(screen_name)s' has been deleted.",
442
            screen_name=user.screen_name,
443
        )
444
    )
445
446
    return redirect_to('.view', user_id=user.id)
447
448
449
# -------------------------------------------------------------------- #
450
# screen name
451
452
453 1
@blueprint.get('/<uuid:user_id>/change_screen_name')
454 1
@permission_required('user.administrate')
455 1
@templated
456 1
def change_screen_name_form(user_id, erroneous_form=None):
457
    """Show form to change the user's screen name."""
458 1
    user = _get_user_for_admin_or_404(user_id)
459
460 1
    form = erroneous_form if erroneous_form else ChangeScreenNameForm()
461
462 1
    return {
463
        'profile_user': user,
464
        'user': user,
465
        'form': form,
466
    }
467
468
469 1
@blueprint.post('/<uuid:user_id>/change_screen_name')
470 1
@permission_required('user.administrate')
471 1
def change_screen_name(user_id):
472
    """Change the user's screen name."""
473
    user = _get_user_or_404(user_id)
474
475
    form = ChangeScreenNameForm(request.form)
476
    if not form.validate():
477
        return change_screen_name_form(user.id, form)
478
479
    old_screen_name = user.screen_name
480
    new_screen_name = form.screen_name.data.strip()
481
    initiator = g.user
482
    reason = form.reason.data.strip()
483
484
    event = user_command_service.change_screen_name(
485
        user, new_screen_name, initiator, reason=reason
486
    )
487
488
    user_signals.screen_name_changed.send(None, event=event)
489
490
    flash_success(
491
        gettext(
492
            "User '%(old_screen_name)s' has been renamed to '%(new_screen_name)s'.",
493
            old_screen_name=old_screen_name,
494
            new_screen_name=new_screen_name,
495
        )
496
    )
497
498
    return redirect_to('.view', user_id=user.id)
499
500
501
# -------------------------------------------------------------------- #
502
# email address
503
504
505 1
@blueprint.get('/<uuid:user_id>/change_email_address')
506 1
@permission_required('user.administrate')
507 1
@templated
508 1
def change_email_address_form(user_id, erroneous_form=None):
509
    """Show form to change the user's e-mail address."""
510 1
    user = _get_user_for_admin_or_404(user_id)
511
512 1
    form = erroneous_form if erroneous_form else ChangeEmailAddressForm()
513
514 1
    return {
515
        'profile_user': user,
516
        'user': user,
517
        'form': form,
518
    }
519
520
521 1
@blueprint.post('/<uuid:user_id>/change_email_address')
522 1
@permission_required('user.administrate')
523 1
def change_email_address(user_id):
524
    """Change the user's e-mail address."""
525
    user = _get_user_or_404(user_id)
526
527
    form = ChangeEmailAddressForm(request.form)
528
    if not form.validate():
529
        return change_email_address_form(user.id, form)
530
531
    new_email_address = form.email_address.data.strip()
532
    verified = False
533
    initiator = g.user
534
    reason = form.reason.data.strip()
535
536
    event = user_command_service.change_email_address(
537
        user, new_email_address, verified, initiator, reason=reason
538
    )
539
540
    user_signals.email_address_changed.send(None, event=event)
541
542
    flash_success(
543
        gettext(
544
            "Email address for user '%(screen_name)s' has been updated.",
545
            screen_name=user.screen_name,
546
        )
547
    )
548
549
    return redirect_to('.view', user_id=user.id)
550
551
552 1
@blueprint.get('/<uuid:user_id>/invalidate_email_address')
553 1
@permission_required('user.administrate')
554 1
@templated
555 1
def invalidate_email_address_form(user_id, erroneous_form=None):
556
    """Show form to invalidate the email address assigned with the account."""
557
    user = _get_user_for_admin_or_404(user_id)
558
559
    email_address = user_service.get_email_address_data(user_id)
560
    if not email_address.verified:
561
        flash_error(gettext('Email address is already invalidated.'))
562
        return redirect_to('.view', user_id=user.id)
563
564
    form = erroneous_form if erroneous_form else InvalidateEmailAddressForm()
565
566
    return {
567
        'profile_user': user,
568
        'user': user,
569
        'form': form,
570
    }
571
572
573 1
@blueprint.post('/<uuid:user_id>/invalidate_email_address')
574 1
@permission_required('user.administrate')
575 1
def invalidate_email_address(user_id):
576
    """Invalidate the email address assigned with the account."""
577
    user = _get_user_or_404(user_id)
578
579
    form = InvalidateEmailAddressForm(request.form)
580
    if not form.validate():
581
        return invalidate_email_address_form(user.id, form)
582
583
    initiator = g.user
584
    reason = form.reason.data.strip()
585
586
    invalidation_result = user_email_address_service.invalidate_email_address(
587
        user, reason, initiator=initiator
588
    )
589
590
    if invalidation_result.is_err():
591
        flash_error(invalidation_result.unwrap_err())
592
        return redirect_to('.view', user_id=user.id)
593
594
    event = invalidation_result.unwrap()
595
596
    user_signals.email_address_invalidated.send(None, event=event)
597
598
    flash_success(
599
        gettext(
600
            "The email address of user '%(screen_name)s' has been invalidated.",
601
            screen_name=user.screen_name,
602
        )
603
    )
604
605
    return redirect_to('.view', user_id=user.id)
606
607
608
# -------------------------------------------------------------------- #
609
# details
610
611
612 1
@blueprint.get('/<uuid:user_id>/details')
613 1
@permission_required('user.administrate')
614 1
@templated
615 1
def change_details_form(user_id, erroneous_form=None):
616
    """Show a form to change the user's details."""
617
    user = _get_user_for_admin_or_404(user_id)
618
619
    detail = user_service.get_detail(user_id)
620
621
    form = erroneous_form if erroneous_form else ChangeDetailsForm(obj=detail)
622
    country_names = country_service.get_country_names()
623
624
    return {
625
        'profile_user': user,
626
        'user': user,
627
        'form': form,
628
        'country_names': country_names,
629
    }
630
631
632 1
@blueprint.post('/<uuid:user_id>/details')
633 1
@permission_required('user.administrate')
634 1
def change_details(user_id):
635
    """Change the user's details."""
636
    user = _get_user_or_404(user_id)
637
638
    form = ChangeDetailsForm(request.form)
639
    if not form.validate():
640
        return change_details_form(user.id, form)
641
642
    first_name = form.first_name.data.strip()
643
    last_name = form.last_name.data.strip()
644
    date_of_birth = form.date_of_birth.data
645
    country = form.country.data.strip()
646
    zip_code = form.zip_code.data.strip()
647
    city = form.city.data.strip()
648
    street = form.street.data.strip()
649
    phone_number = form.phone_number.data.strip()
650
    initiator = g.user
651
652
    event = user_command_service.update_user_details(
653
        user.id,
654
        first_name,
655
        last_name,
656
        date_of_birth,
657
        country,
658
        zip_code,
659
        city,
660
        street,
661
        phone_number,
662
        initiator,
663
    )
664
665
    flash_success(gettext('Changes have been saved.'))
666
667
    user_signals.details_updated.send(None, event=event)
668
669
    return redirect_to('.view', user_id=user.id)
670
671
672
# -------------------------------------------------------------------- #
673
# authentication
674
675
676 1
@blueprint.get('/<uuid:user_id>/password')
677 1
@permission_required('user.set_password')
678 1
@templated
679 1
def set_password_form(user_id, erroneous_form=None):
680
    """Show a form to set a new password for the user."""
681 1
    user = _get_user_for_admin_or_404(user_id)
682
683 1
    form = erroneous_form if erroneous_form else SetPasswordForm()
684
685 1
    return {
686
        'profile_user': user,
687
        'user': user,
688
        'form': form,
689
    }
690
691
692 1
@blueprint.post('/<uuid:user_id>/password')
693 1
@permission_required('user.set_password')
694 1
def set_password(user_id):
695
    """Set a new password for the user."""
696
    user = _get_user_or_404(user_id)
697
698
    form = SetPasswordForm(request.form)
699
    if not form.validate():
700
        return set_password_form(user.id, form)
701
702
    new_password = form.password.data
703
    initiator = g.user
704
705
    event = authn_password_service.update_password_hash(
706
        user, new_password, initiator
707
    )
708
709
    authn_signals.password_updated.send(None, event=event)
710
711
    flash_success(
712
        gettext(
713
            "New password has been set for user '%(screen_name)s'.",
714
            screen_name=user.screen_name,
715
        )
716
    )
717
718
    return redirect_to('.view', user_id=user.id)
719
720
721
# -------------------------------------------------------------------- #
722
# authorization
723
724
725 1
@blueprint.get('/<uuid:user_id>/permissions')
726 1
@permission_required('user.view')
727 1
@templated
728 1
def view_permissions(user_id):
729
    """Show user's permissions."""
730 1
    user = _get_user_for_admin_or_404(user_id)
731
732 1
    user_permission_ids_by_role = (
733
        authz_service.get_permission_ids_by_role_for_user(user.id)
734
    )
735
736 1
    permissions_by_role = _index_permissions_by_role(
737
        user_permission_ids_by_role
738
    )
739
740 1
    return {
741
        'profile_user': user,
742
        'user': user,
743
        'permissions_by_role': permissions_by_role,
744
    }
745
746
747 1
@blueprint.get('/<uuid:user_id>/roles/assignment')
748 1
@permission_required('role.assign')
749 1
@templated
750 1
def manage_roles(user_id):
751
    """Manage what roles are assigned to the user."""
752 1
    user = _get_user_for_admin_or_404(user_id)
753
754 1
    permission_ids_by_role = authz_service.get_permission_ids_by_role()
755
756 1
    permissions_by_role = _index_permissions_by_role(permission_ids_by_role)
757
758 1
    user_role_ids = authz_service.find_role_ids_for_user(user.id)
759
760 1
    return {
761
        'profile_user': user,
762
        'user': user,
763
        'permissions_by_role': permissions_by_role,
764
        'user_role_ids': user_role_ids,
765
    }
766
767
768 1
def _index_permissions_by_role(
769
    permission_ids_by_role: dict[Role, frozenset[PermissionID]]
770
) -> dict[Role, frozenset[Permission]]:
771 1
    registered_permissions_by_id = {
772
        permission.id: permission
773
        for permission in permission_registry.get_registered_permissions()
774
    }
775
776 1
    return {
777
        role: frozenset(
778
            registered_permissions_by_id[permission_id]
779
            for permission_id in permission_ids
780
            if permission_id in registered_permissions_by_id
781
        )
782
        for role, permission_ids in permission_ids_by_role.items()
783
    }
784
785
786 1
@blueprint.post('/<uuid:user_id>/roles/<role_id>')
787 1
@permission_required('role.assign')
788 1
@respond_no_content
789 1
def role_assign(user_id, role_id):
790
    """Assign the role to the user."""
791
    user = _get_user_or_404(user_id)
792
    role = _get_role_or_404(role_id)
793
    initiator = g.user
794
795
    event = authz_service.assign_role_to_user(
796
        role.id, user, initiator=initiator
797
    )
798
799
    if event is not None:
800
        authz_signals.role_assigned_to_user.send(None, event=event)
801
802
    flash_success(
803
        gettext(
804
            '%(role_title)s has been assigned to "%(screen_name)s".',
805
            screen_name=user.screen_name,
806
            role_title=role.title,
807
        )
808
    )
809
810
811 1
@blueprint.delete('/<uuid:user_id>/roles/<role_id>')
812 1
@permission_required('role.assign')
813 1
@respond_no_content
814 1
def role_deassign(user_id, role_id):
815
    """Deassign the role from the user."""
816
    user = _get_user_or_404(user_id)
817
    role = _get_role_or_404(role_id)
818
    initiator = g.user
819
820
    event = authz_service.deassign_role_from_user(
821
        role.id, user, initiator=initiator
822
    ).unwrap()
823
824
    authz_signals.role_deassigned_from_user.send(None, event=event)
825
826
    flash_success(
827
        gettext(
828
            '%(role_title)s has been withdrawn from "%(screen_name)s".',
829
            screen_name=user.screen_name,
830
            role_title=role.title,
831
        )
832
    )
833
834
835
# -------------------------------------------------------------------- #
836
# events
837
838
839 1
@blueprint.get('/<uuid:user_id>/events')
840 1
@permission_required('user.view')
841 1
@templated
842 1
def view_events(user_id):
843
    """Show user's events."""
844 1
    user = _get_user_for_admin_or_404(user_id)
845
846 1
    log_entries = list(service.get_log_entries(user.id))
847
848 1
    include_logins = request.args.get('include_logins', default='yes') == 'yes'
849 1
    if not include_logins:
850
        log_entries = [
851
            entry
852
            for entry in log_entries
853
            if entry['event_type'] != 'user-logged-in'
854
        ]
855
856 1
    return {
857
        'profile_user': user,
858
        'user': user,
859
        'log_entries': log_entries,
860
        'logins_included': include_logins,
861
    }
862
863
864
# -------------------------------------------------------------------- #
865
# helpers
866
867
868 1
def _get_user_for_admin_or_404(user_id) -> UserForAdmin:
869 1
    user = user_service.find_user_for_admin(user_id)
870
871 1
    if user is None:
872
        abort(404)
873
874 1
    return user
875
876
877 1
def _get_user_or_404(user_id):
878
    user = user_service.find_user(user_id)
879
880
    if user is None:
881
        abort(404)
882
883
    return user
884
885
886 1
def _get_role_or_404(role_id):
887
    role = authz_service.find_role(role_id)
888
889
    if role is None:
890
        abort(404)
891
892
    return role
893