Passed
Push — main ( cf3570...95bc96 )
by Jochen
04:50
created

change_screen_name()   A

Complexity

Conditions 2

Size

Total Lines 30
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 4.5185

Importance

Changes 0
Metric Value
cc 2
eloc 20
nop 1
dl 0
loc 30
ccs 2
cts 14
cp 0.1429
crap 4.5185
rs 9.4
c 0
b 0
f 0
1
"""
2
byceps.blueprints.admin.user.views
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2021 Jochen Kupperschmidt
6
:License: Revised BSD (see `LICENSE` file for details)
7
"""
8
9 1
from __future__ import annotations
10 1
from datetime import datetime
11 1
from typing import Optional
12
13 1
from flask import abort, g, request
14 1
from flask_babel import gettext
15
16 1
from ....services.authentication.password import service as password_service
17 1
from ....services.authentication.session import service as session_service
18 1
from ....services.authorization import service as authorization_service
19 1
from ....services.authorization.transfer.models import (
20
    Role,
21
    Permission,
22
    PermissionID,
23
)
24 1
from ....services.country import service as country_service
25 1
from ....services.orga_team import service as orga_team_service
26 1
from ....services.shop.order import service as order_service
27 1
from ....services.shop.shop import service as shop_service
28 1
from ....services.site import service as site_service
29 1
from ....services.user import (
30
    command_service as user_command_service,
31
    creation_service as user_creation_service,
32
    deletion_service as user_deletion_service,
33
    email_address_verification_service,
34
    service as user_service,
35
    stats_service as user_stats_service,
36
)
37 1
from ....services.user.transfer.models import UserForAdmin, UserStateFilter
38 1
from ....services.user_badge import awarding_service as badge_awarding_service
39 1
from ....signals import user as user_signals
40 1
from ....util.authorization import permission_registry
41 1
from ....util.framework.blueprint import create_blueprint
42 1
from ....util.framework.flash import flash_error, flash_success
43 1
from ....util.framework.templating import templated
44 1
from ....util.views import permission_required, redirect_to, respond_no_content
45
46 1
from .forms import (
47
    ChangeDetailsForm,
48
    ChangeEmailAddressForm,
49
    ChangeScreenNameForm,
50
    CreateAccountForm,
51
    DeleteAccountForm,
52
    InvalidateEmailAddressForm,
53
    SetPasswordForm,
54
    SuspendAccountForm,
55
)
56 1
from . import service
57
58
59 1
blueprint = create_blueprint('user_admin', __name__)
60
61
62 1
@blueprint.get('/', defaults={'page': 1})
63 1
@blueprint.get('/pages/<int:page>')
64 1
@permission_required('user.view')
65 1
@templated
66
def index(page):
67
    """List users."""
68 1
    per_page = request.args.get('per_page', type=int, default=20)
69 1
    search_term = request.args.get('search_term', default='').strip()
70 1
    only = request.args.get('only')
71
72 1
    user_state_filter = UserStateFilter.__members__.get(
73
        only, UserStateFilter.none
74
    )
75
76 1
    users = user_service.get_users_paginated(
77
        page, per_page, search_term=search_term, state_filter=user_state_filter
78
    )
79
80 1
    total_active = user_stats_service.count_active_users()
81 1
    total_uninitialized = user_stats_service.count_uninitialized_users()
82 1
    total_suspended = user_stats_service.count_suspended_users()
83 1
    total_deleted = user_stats_service.count_deleted_users()
84 1
    total_overall = user_stats_service.count_users()
85
86 1
    return {
87
        'users': users,
88
        'total_active': total_active,
89
        'total_uninitialized': total_uninitialized,
90
        'total_suspended': total_suspended,
91
        'total_deleted': total_deleted,
92
        'total_overall': total_overall,
93
        'search_term': search_term,
94
        'only': only,
95
        'UserStateFilter': UserStateFilter,
96
        'user_state_filter': user_state_filter,
97
    }
98
99
100 1
@blueprint.get('/<uuid:user_id>')
101 1
@permission_required('user.view')
102 1
@templated
103
def view(user_id):
104
    """Show a user's interal profile."""
105 1
    user = _get_user_for_admin_or_404(user_id)
106 1
    db_user = user_service.find_user_with_details(user.id)
107
108 1
    recent_login = session_service.find_recent_login(user.id)
109 1
    days_since_recent_login = _calculate_days_since(recent_login)
110
111 1
    orga_activities = orga_team_service.get_orga_activities_for_user(user.id)
112
113 1
    newsletter_subscription_states = list(
114
        service.get_newsletter_subscription_states(user.id)
115
    )
116 1
    newsletter_subscription_count = sum(
117
        1 for _, subscribed in newsletter_subscription_states if subscribed
118
    )
119
120 1
    orders = order_service.get_orders_placed_by_user(user.id)
121
122 1
    order_shop_ids = {order.shop_id for order in orders}
123 1
    shops = shop_service.find_shops(order_shop_ids)
124 1
    shops_by_id = {shop.id: shop for shop in shops}
125
126 1
    parties_and_tickets = service.get_parties_and_tickets(user.id)
127 1
    ticket_count = sum(len(tickets) for _, tickets in parties_and_tickets)
128
129 1
    attended_parties = service.get_attended_parties(user.id)
130
131 1
    badges_with_awarding_quantity = (
132
        badge_awarding_service.get_badges_awarded_to_user(user.id)
133
    )
134 1
    badge_count = len(badges_with_awarding_quantity)
135
136 1
    return {
137
        'profile_user': user,
138
        'user': db_user,
139
        'recent_login': recent_login,
140
        'days_since_recent_login': days_since_recent_login,
141
        'orga_activities': orga_activities,
142
        'newsletter_subscription_count': newsletter_subscription_count,
143
        'newsletter_subscription_states': newsletter_subscription_states,
144
        'orders': orders,
145
        'shops_by_id': shops_by_id,
146
        'parties_and_tickets': parties_and_tickets,
147
        'ticket_count': ticket_count,
148
        'attended_parties': attended_parties,
149
        'badge_count': badge_count,
150
        'badges_with_awarding_quantity': badges_with_awarding_quantity,
151
    }
152
153
154 1
def _calculate_days_since(dt: Optional[datetime]) -> Optional[int]:
155 1
    if dt is None:
156 1
        return None
157
158
    return (datetime.utcnow().date() - dt.date()).days
159
160
161
# -------------------------------------------------------------------- #
162
# account
163
164
165 1
@blueprint.get('/create')
166 1
@permission_required('user.create')
167 1
@templated
168 1
def create_account_form(erroneous_form=None):
169
    """Show a form to create a user account."""
170 1
    form = erroneous_form if erroneous_form else CreateAccountForm()
171 1
    form.set_site_choices()
172
173 1
    return {'form': form}
174
175
176 1
@blueprint.post('/')
177 1
@permission_required('user.create')
178
def create_account():
179
    """Create a user account."""
180
    form = CreateAccountForm(request.form)
181
    form.set_site_choices()
182
183
    if not form.validate():
184
        return create_account_form(form)
185
186
    screen_name = form.screen_name.data.strip()
187
    first_names = form.first_names.data.strip()
188
    last_name = form.last_name.data.strip()
189
    email_address = form.email_address.data.lower()
190
    password = form.password.data
191
    site_id_for_email = form.site_id.data
192
193
    if site_id_for_email:
194
        site_for_email = site_service.get_site(site_id_for_email)
195
    else:
196
        site_for_email = None
197
198
    initiator_id = g.user.id
199
200
    try:
201
        user, event = user_creation_service.create_user(
202
            screen_name,
203
            email_address,
204
            password,
205
            first_names=first_names,
206
            last_name=last_name,
207
            creator_id=initiator_id,
208
            # Do not pass site ID here; the account is not created on a site.
209
        )
210
    except user_creation_service.UserCreationFailed:
211
        flash_error(
212
            gettext(
213
                'User "%(screen_name)s" could not be created.',
214
                screen_name=screen_name,
215
            )
216
        )
217
        return create_account_form(form)
218
219
    flash_success(
220
        gettext(
221
            'User "%(screen_name)s" has been created.',
222
            screen_name=user.screen_name,
223
        )
224
    )
225
226
    if site_for_email:
227
        user_creation_service.request_email_address_confirmation(
228
            user, email_address, site_for_email.id
229
        )
230
        flash_success(
231
            gettext('An email has been sent to the corresponding address.'),
232
            icon='email',
233
        )
234
235
    user_signals.account_created.send(None, event=event)
236
237
    return redirect_to('.view', user_id=user.id)
238
239
240 1
@blueprint.post('/<uuid:user_id>/initialize')
241 1
@permission_required('user.administrate')
242 1
@respond_no_content
243
def initialize_account(user_id):
244
    """Initialize the user account."""
245
    user = _get_user_or_404(user_id)
246
247
    initiator_id = g.user.id
248
249
    user_command_service.initialize_account(user.id, initiator_id=initiator_id)
250
251
    flash_success(
252
        gettext(
253
            "User '%(screen_name)s' has been initialized.",
254
            screen_name=user.screen_name,
255
        )
256
    )
257
258
259 1 View Code Duplication
@blueprint.get('/<uuid:user_id>/suspend')
260 1
@permission_required('user.administrate')
261 1
@templated
262 1
def suspend_account_form(user_id, erroneous_form=None):
263
    """Show form to suspend the user account."""
264 1
    user = _get_user_for_admin_or_404(user_id)
265
266 1
    if user.suspended:
267 1
        flash_error(
268
            gettext(
269
                "User '%(screen_name)s' is already suspended.",
270
                screen_name=user.screen_name,
271
            )
272
        )
273 1
        return redirect_to('.view', user_id=user.id)
274
275 1
    form = erroneous_form if erroneous_form else SuspendAccountForm()
276
277 1
    return {
278
        'profile_user': user,
279
        'user': user,
280
        'form': form,
281
    }
282
283
284 1 View Code Duplication
@blueprint.post('/<uuid:user_id>/suspend')
285 1
@permission_required('user.administrate')
286
def suspend_account(user_id):
287
    """Suspend the user account."""
288
    user = _get_user_or_404(user_id)
289
290
    if user.suspended:
291
        flash_error(
292
            gettext(
293
                "User '%(screen_name)s' is already suspended.",
294
                screen_name=user.screen_name,
295
            )
296
        )
297
        return redirect_to('.view', user_id=user.id)
298
299
    form = SuspendAccountForm(request.form)
300
    if not form.validate():
301
        return suspend_account_form(user.id, form)
302
303
    initiator_id = g.user.id
304
    reason = form.reason.data.strip()
305
306
    event = user_command_service.suspend_account(user.id, initiator_id, reason)
307
308
    user_signals.account_suspended.send(None, event=event)
309
310
    flash_success(
311
        gettext(
312
            "User '%(screen_name)s' has been suspended.",
313
            screen_name=user.screen_name,
314
        )
315
    )
316
317
    return redirect_to('.view', user_id=user.id)
318
319
320 1 View Code Duplication
@blueprint.get('/<uuid:user_id>/unsuspend')
321 1
@permission_required('user.administrate')
322 1
@templated
323 1
def unsuspend_account_form(user_id, erroneous_form=None):
324
    """Show form to unsuspend the user account."""
325 1
    user = _get_user_for_admin_or_404(user_id)
326
327 1
    if not user.suspended:
328 1
        flash_error(
329
            gettext(
330
                "User '%(screen_name)s' is not suspended.",
331
                screen_name=user.screen_name,
332
            )
333
        )
334 1
        return redirect_to('.view', user_id=user.id)
335
336 1
    form = erroneous_form if erroneous_form else SuspendAccountForm()
337
338 1
    return {
339
        'profile_user': user,
340
        'user': user,
341
        'form': form,
342
    }
343
344
345 1 View Code Duplication
@blueprint.post('/<uuid:user_id>/unsuspend')
346 1
@permission_required('user.administrate')
347
def unsuspend_account(user_id):
348
    """Unsuspend the user account."""
349
    user = _get_user_or_404(user_id)
350
351
    if not user.suspended:
352
        flash_error(
353
            gettext(
354
                "User '%(screen_name)s' is not suspended.",
355
                screen_name=user.screen_name,
356
            )
357
        )
358
        return redirect_to('.view', user_id=user.id)
359
360
    form = SuspendAccountForm(request.form)
361
    if not form.validate():
362
        return unsuspend_account_form(user.id, form)
363
364
    initiator_id = g.user.id
365
    reason = form.reason.data.strip()
366
367
    event = user_command_service.unsuspend_account(
368
        user.id, initiator_id, reason
369
    )
370
371
    user_signals.account_unsuspended.send(None, event=event)
372
373
    flash_success(
374
        gettext(
375
            "User '%(screen_name)s' has been unsuspended.",
376
            screen_name=user.screen_name,
377
        )
378
    )
379
380
    return redirect_to('.view', user_id=user.id)
381
382
383 1
@blueprint.get('/<uuid:user_id>/delete')
384 1
@permission_required('user.administrate')
385 1
@templated
386 1
def delete_account_form(user_id, erroneous_form=None):
387
    """Show form to delete the user account."""
388 1
    user = _get_user_for_admin_or_404(user_id)
389
390 1
    if user.deleted:
391 1
        flash_error(
392
            gettext(
393
                "User '%(screen_name)s' has already been deleted.",
394
                screen_name=user.screen_name,
395
            )
396
        )
397 1
        return redirect_to('.view', user_id=user.id)
398
399 1
    form = erroneous_form if erroneous_form else DeleteAccountForm()
400
401 1
    return {
402
        'profile_user': user,
403
        'user': user,
404
        'form': form,
405
    }
406
407
408 1
@blueprint.post('/<uuid:user_id>/delete')
409 1
@permission_required('user.administrate')
410
def delete_account(user_id):
411
    """Delete the user account."""
412
    user = _get_user_or_404(user_id)
413
414
    if user.deleted:
415
        flash_error(
416
            gettext(
417
                "User '%(screen_name)s' has already been deleted.",
418
                screen_name=user.screen_name,
419
            )
420
        )
421
        return redirect_to('.view', user_id=user.id)
422
423
    form = DeleteAccountForm(request.form)
424
    if not form.validate():
425
        return delete_account_form(user.id, form)
426
427
    initiator_id = g.user.id
428
    reason = form.reason.data.strip()
429
430
    event = user_deletion_service.delete_account(user.id, initiator_id, reason)
431
432
    user_signals.account_deleted.send(None, event=event)
433
434
    flash_success(
435
        gettext(
436
            "User '%(screen_name)s' has been deleted.",
437
            screen_name=user.screen_name,
438
        )
439
    )
440
441
    return redirect_to('.view', user_id=user.id)
442
443
444
# -------------------------------------------------------------------- #
445
# screen name
446
447
448 1
@blueprint.get('/<uuid:user_id>/change_screen_name')
449 1
@permission_required('user.administrate')
450 1
@templated
451 1
def change_screen_name_form(user_id, erroneous_form=None):
452
    """Show form to change the user's screen name."""
453 1
    user = _get_user_for_admin_or_404(user_id)
454
455 1
    form = erroneous_form if erroneous_form else ChangeScreenNameForm()
456
457 1
    return {
458
        'profile_user': user,
459
        'user': user,
460
        'form': form,
461
    }
462
463
464 1
@blueprint.post('/<uuid:user_id>/change_screen_name')
465 1
@permission_required('user.administrate')
466
def change_screen_name(user_id):
467
    """Change the user's screen name."""
468
    user = _get_user_or_404(user_id)
469
470
    form = ChangeScreenNameForm(request.form)
471
    if not form.validate():
472
        return change_screen_name_form(user.id, form)
473
474
    old_screen_name = user.screen_name
475
    new_screen_name = form.screen_name.data.strip()
476
    initiator_id = g.user.id
477
    reason = form.reason.data.strip()
478
479
    event = user_command_service.change_screen_name(
480
        user.id, new_screen_name, initiator_id, reason=reason
481
    )
482
483
    user_signals.screen_name_changed.send(None, event=event)
484
485
    flash_success(
486
        gettext(
487
            "User '%(old_screen_name)s' has been renamed to '%(new_screen_name)s'.",
488
            old_screen_name=old_screen_name,
489
            new_screen_name=new_screen_name,
490
        )
491
    )
492
493
    return redirect_to('.view', user_id=user.id)
494
495
496
# -------------------------------------------------------------------- #
497
# email address
498
499
500 1
@blueprint.get('/<uuid:user_id>/change_email_address')
501 1
@permission_required('user.administrate')
502 1
@templated
503 1
def change_email_address_form(user_id, erroneous_form=None):
504
    """Show form to change the user's e-mail address."""
505 1
    user = _get_user_for_admin_or_404(user_id)
506
507 1
    form = erroneous_form if erroneous_form else ChangeEmailAddressForm()
508
509 1
    return {
510
        'profile_user': user,
511
        'user': user,
512
        'form': form,
513
    }
514
515
516 1
@blueprint.post('/<uuid:user_id>/change_email_address')
517 1
@permission_required('user.administrate')
518
def change_email_address(user_id):
519
    """Change the user's e-mail address."""
520
    user = _get_user_or_404(user_id)
521
522
    form = ChangeEmailAddressForm(request.form)
523
    if not form.validate():
524
        return change_email_address_form(user.id, form)
525
526
    old_email_address = user_service.find_email_address(user.id)
527
    new_email_address = form.email_address.data.strip()
528
    verified = False
529
    initiator_id = g.user.id
530
    reason = form.reason.data.strip()
531
532
    event = user_command_service.change_email_address(
533
        user.id, new_email_address, verified, initiator_id, reason=reason
534
    )
535
536
    user_signals.email_address_changed.send(None, event=event)
537
538
    flash_success(
539
        gettext(
540
            "Email address for user '%(screen_name)s' has been updated.",
541
            screen_name=user.screen_name,
542
        )
543
    )
544
545
    return redirect_to('.view', user_id=user.id)
546
547
548 1
@blueprint.get('/<uuid:user_id>/invalidate_email_address')
549 1
@permission_required('user.administrate')
550 1
@templated
551 1
def invalidate_email_address_form(user_id, erroneous_form=None):
552
    """Show form to invalidate the email address assigned with the account."""
553
    user = _get_user_for_admin_or_404(user_id)
554
555
    email_address = user_service.get_email_address_data(user_id)
556
    if not email_address.verified:
557
        flash_error(gettext('Email address is already invalidated.'))
558
        return redirect_to('.view', user_id=user.id)
559
560
    form = erroneous_form if erroneous_form else InvalidateEmailAddressForm()
561
562
    return {
563
        'profile_user': user,
564
        'user': user,
565
        'form': form,
566
    }
567
568
569 1
@blueprint.post('/<uuid:user_id>/invalidate_email_address')
570 1
@permission_required('user.administrate')
571
def invalidate_email_address(user_id):
572
    """Invalidate the email address assigned with the account."""
573
    user = _get_user_or_404(user_id)
574
575
    email_address = user_service.get_email_address_data(user_id)
576
    if not email_address.verified:
577
        flash_error(gettext('Email address is already invalidated.'))
578
        return redirect_to('.view', user_id=user.id)
579
580
    form = InvalidateEmailAddressForm(request.form)
581
    if not form.validate():
582
        return invalidate_email_address_form(user.id, form)
583
584
    initiator_id = g.user.id
585
    reason = form.reason.data.strip()
586
587
    event = email_address_verification_service.invalidate_email_address(
588
        user.id, reason, initiator_id=initiator_id
589
    )
590
591
    user_signals.email_address_invalidated.send(None, event=event)
592
593
    flash_success(
594
        gettext(
595
            "The email address of user '%(screen_name)s' has been invalidated.",
596
            screen_name=user.screen_name,
597
        )
598
    )
599
600
    return redirect_to('.view', user_id=user.id)
601
602
603
# -------------------------------------------------------------------- #
604
# details
605
606
607 1
@blueprint.get('/<uuid:user_id>/details')
608 1
@permission_required('user.administrate')
609 1
@templated
610 1
def change_details_form(user_id, erroneous_form=None):
611
    """Show a form to change the user's details."""
612
    user = _get_user_for_admin_or_404(user_id)
613
614
    detail = user_service.get_detail(user_id)
615
616
    form = erroneous_form if erroneous_form else ChangeDetailsForm(obj=detail)
617
    country_names = country_service.get_country_names()
618
619
    return {
620
        'profile_user': user,
621
        'user': user,
622
        'form': form,
623
        'country_names': country_names,
624
    }
625
626
627 1
@blueprint.post('/<uuid:user_id>/details')
628 1
@permission_required('user.administrate')
629
def change_details(user_id):
630
    """Change the user's details."""
631
    user = _get_user_or_404(user_id)
632
633
    form = ChangeDetailsForm(request.form)
634
    if not form.validate():
635
        return change_details_form(user.id, form)
636
637
    first_names = form.first_names.data.strip()
638
    last_name = form.last_name.data.strip()
639
    date_of_birth = form.date_of_birth.data
640
    country = form.country.data.strip()
641
    zip_code = form.zip_code.data.strip()
642
    city = form.city.data.strip()
643
    street = form.street.data.strip()
644
    phone_number = form.phone_number.data.strip()
645
646
    event = user_command_service.update_user_details(
647
        user.id,
648
        first_names,
649
        last_name,
650
        date_of_birth,
651
        country,
652
        zip_code,
653
        city,
654
        street,
655
        phone_number,
656
        g.user.id,  # initiator_id
657
    )
658
659
    flash_success(gettext('Changes have been saved.'))
660
661
    user_signals.details_updated.send(None, event=event)
662
663
    return redirect_to('.view', user_id=user.id)
664
665
666
# -------------------------------------------------------------------- #
667
# authentication
668
669
670 1
@blueprint.get('/<uuid:user_id>/password')
671 1
@permission_required('user.set_password')
672 1
@templated
673 1
def set_password_form(user_id, erroneous_form=None):
674
    """Show a form to set a new password for the user."""
675 1
    user = _get_user_for_admin_or_404(user_id)
676
677 1
    form = erroneous_form if erroneous_form else SetPasswordForm()
678
679 1
    return {
680
        'profile_user': user,
681
        'user': user,
682
        'form': form,
683
    }
684
685
686 1
@blueprint.post('/<uuid:user_id>/password')
687 1
@permission_required('user.set_password')
688
def set_password(user_id):
689
    """Set a new password for the user."""
690
    user = _get_user_or_404(user_id)
691
692
    form = SetPasswordForm(request.form)
693
    if not form.validate():
694
        return set_password_form(user.id, form)
695
696
    new_password = form.password.data
697
    initiator_id = g.user.id
698
699
    password_service.update_password_hash(user.id, new_password, initiator_id)
700
701
    flash_success(
702
        gettext(
703
            "New password has been set for user '%(screen_name)s'.",
704
            screen_name=user.screen_name,
705
        )
706
    )
707
708
    return redirect_to('.view', user_id=user.id)
709
710
711
# -------------------------------------------------------------------- #
712
# authorization
713
714
715 1
@blueprint.get('/<uuid:user_id>/permissions')
716 1
@permission_required('user.view')
717 1
@templated
718
def view_permissions(user_id):
719
    """Show user's permissions."""
720 1
    user = _get_user_for_admin_or_404(user_id)
721
722 1
    user_permission_ids_by_role = (
723
        authorization_service.get_permission_ids_by_role_for_user(user.id)
724
    )
725
726 1
    permissions_by_role = _index_permissions_by_role(
727
        user_permission_ids_by_role
728
    )
729
730 1
    return {
731
        'profile_user': user,
732
        'user': user,
733
        'permissions_by_role': permissions_by_role,
734
    }
735
736
737 1
@blueprint.get('/<uuid:user_id>/roles/assignment')
738 1
@permission_required('role.assign')
739 1
@templated
740
def manage_roles(user_id):
741
    """Manage what roles are assigned to the user."""
742 1
    user = _get_user_for_admin_or_404(user_id)
743
744 1
    permission_ids_by_role = authorization_service.get_permission_ids_by_role()
745
746 1
    permissions_by_role = _index_permissions_by_role(permission_ids_by_role)
747
748 1
    user_role_ids = authorization_service.find_role_ids_for_user(user.id)
749
750 1
    return {
751
        'profile_user': user,
752
        'user': user,
753
        'permissions_by_role': permissions_by_role,
754
        'user_role_ids': user_role_ids,
755
    }
756
757
758 1
def _index_permissions_by_role(
759
    permission_ids_by_role: dict[Role, frozenset[PermissionID]]
760
) -> dict[Role, frozenset[Permission]]:
761 1
    registered_permissions_by_id = {
762
        permission.id: permission
763
        for permission in permission_registry.get_registered_permissions()
764
    }
765
766 1
    return {
767
        role: frozenset(
768
            registered_permissions_by_id[permission_id]
769
            for permission_id in permission_ids
770
            if permission_id in registered_permissions_by_id
771
        )
772
        for role, permission_ids in permission_ids_by_role.items()
773
    }
774
775
776 1
@blueprint.post('/<uuid:user_id>/roles/<role_id>')
777 1
@permission_required('role.assign')
778 1
@respond_no_content
779
def role_assign(user_id, role_id):
780
    """Assign the role to the user."""
781
    user = _get_user_or_404(user_id)
782
    role = _get_role_or_404(role_id)
783
    initiator_id = g.user.id
784
785
    authorization_service.assign_role_to_user(
786
        role.id, user.id, initiator_id=initiator_id
787
    )
788
789
    flash_success(
790
        gettext(
791
            '%(role_title)s has been assigned to "%(screen_name)s".',
792
            screen_name=user.screen_name,
793
            role_title=role.title,
794
        )
795
    )
796
797
798 1
@blueprint.delete('/<uuid:user_id>/roles/<role_id>')
799 1
@permission_required('role.assign')
800 1
@respond_no_content
801
def role_deassign(user_id, role_id):
802
    """Deassign the role from the user."""
803
    user = _get_user_or_404(user_id)
804
    role = _get_role_or_404(role_id)
805
    initiator_id = g.user.id
806
807
    authorization_service.deassign_role_from_user(
808
        role.id, user.id, initiator_id=initiator_id
809
    )
810
811
    flash_success(
812
        gettext(
813
            '%(role_title)s has been withdrawn from "%(screen_name)s".',
814
            screen_name=user.screen_name,
815
            role_title=role.title,
816
        )
817
    )
818
819
820
# -------------------------------------------------------------------- #
821
# events
822
823
824 1
@blueprint.get('/<uuid:user_id>/events')
825 1
@permission_required('user.view')
826 1
@templated
827
def view_events(user_id):
828
    """Show user's events."""
829 1
    user = _get_user_for_admin_or_404(user_id)
830
831 1
    events = list(service.get_events(user.id))
832
833 1
    include_logins = request.args.get('include_logins', default='yes') == 'yes'
834 1
    if not include_logins:
835
        events = [
836
            event for event in events if event['event'] != 'user-logged-in'
837
        ]
838
839 1
    return {
840
        'profile_user': user,
841
        'user': user,
842
        'events': events,
843
        'logins_included': include_logins,
844
    }
845
846
847
# -------------------------------------------------------------------- #
848
# helpers
849
850
851 1
def _get_user_for_admin_or_404(user_id) -> UserForAdmin:
852 1
    user = user_service.find_user_for_admin(user_id)
853
854 1
    if user is None:
855
        abort(404)
856
857 1
    return user
858
859
860 1
def _get_user_or_404(user_id):
861
    user = user_service.find_user(user_id)
862
863
    if user is None:
864
        abort(404)
865
866
    return user
867
868
869 1
def _get_role_or_404(role_id):
870
    role = authorization_service.find_role(role_id)
871
872
    if role is None:
873
        abort(404)
874
875
    return role
876