Total Complexity | 65 |
Total Lines | 880 |
Duplicated Lines | 12.95 % |
Coverage | 53.39% |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like byceps.blueprints.admin.user.views often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | """ |
||
2 | byceps.blueprints.admin.user.views |
||
3 | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
||
4 | |||
5 | :Copyright: 2014-2022 Jochen Kupperschmidt |
||
6 | :License: Revised BSD (see `LICENSE` file for details) |
||
7 | """ |
||
8 | |||
9 | 1 | from __future__ import annotations |
|
10 | 1 | from datetime import datetime |
|
11 | 1 | from typing import Optional |
|
12 | |||
13 | 1 | from flask import abort, g, request |
|
14 | 1 | from flask_babel import gettext |
|
15 | |||
16 | 1 | from ....services.authentication.password import service as password_service |
|
17 | 1 | from ....services.authentication.session import service as session_service |
|
18 | 1 | from ....services.authorization import service as authorization_service |
|
19 | 1 | from ....services.authorization.transfer.models import ( |
|
20 | Role, |
||
21 | Permission, |
||
22 | PermissionID, |
||
23 | ) |
||
24 | 1 | from ....services.country import service as country_service |
|
25 | 1 | from ....services.orga_team import service as orga_team_service |
|
26 | 1 | from ....services.shop.order import service as order_service |
|
27 | 1 | from ....services.shop.shop import service as shop_service |
|
28 | 1 | from ....services.site import service as site_service |
|
29 | 1 | from ....services.user import ( |
|
30 | command_service as user_command_service, |
||
31 | creation_service as user_creation_service, |
||
32 | deletion_service as user_deletion_service, |
||
33 | email_address_service, |
||
34 | service as user_service, |
||
35 | stats_service as user_stats_service, |
||
36 | ) |
||
37 | 1 | from ....services.user.transfer.models import UserForAdmin, UserStateFilter |
|
38 | 1 | from ....services.user_badge import awarding_service as badge_awarding_service |
|
39 | 1 | from ....signals import user as user_signals |
|
40 | 1 | from ....util.authorization import permission_registry |
|
41 | 1 | from ....util.framework.blueprint import create_blueprint |
|
42 | 1 | from ....util.framework.flash import flash_error, flash_success |
|
43 | 1 | from ....util.framework.templating import templated |
|
44 | 1 | from ....util.views import permission_required, redirect_to, respond_no_content |
|
45 | |||
46 | 1 | from .forms import ( |
|
47 | ACCOUNT_DELETION_VERIFICATION_TEXT, |
||
48 | ChangeDetailsForm, |
||
49 | ChangeEmailAddressForm, |
||
50 | ChangeScreenNameForm, |
||
51 | CreateAccountForm, |
||
52 | DeleteAccountForm, |
||
53 | InvalidateEmailAddressForm, |
||
54 | SetPasswordForm, |
||
55 | SuspendAccountForm, |
||
56 | ) |
||
57 | 1 | from . import service |
|
58 | |||
59 | |||
60 | 1 | blueprint = create_blueprint('user_admin', __name__) |
|
61 | |||
62 | |||
63 | 1 | @blueprint.get('/', defaults={'page': 1}) |
|
64 | 1 | @blueprint.get('/pages/<int:page>') |
|
65 | 1 | @permission_required('user.view') |
|
66 | 1 | @templated |
|
67 | def index(page): |
||
68 | """List users.""" |
||
69 | 1 | per_page = request.args.get('per_page', type=int, default=20) |
|
70 | 1 | search_term = request.args.get('search_term', default='').strip() |
|
71 | 1 | only = request.args.get('only') |
|
72 | |||
73 | 1 | user_state_filter = UserStateFilter.__members__.get( |
|
74 | only, UserStateFilter.none |
||
75 | ) |
||
76 | |||
77 | 1 | users = user_service.get_users_paginated( |
|
78 | page, per_page, search_term=search_term, state_filter=user_state_filter |
||
79 | ) |
||
80 | |||
81 | 1 | total_active = user_stats_service.count_active_users() |
|
82 | 1 | total_uninitialized = user_stats_service.count_uninitialized_users() |
|
83 | 1 | total_suspended = user_stats_service.count_suspended_users() |
|
84 | 1 | total_deleted = user_stats_service.count_deleted_users() |
|
85 | 1 | total_overall = user_stats_service.count_users() |
|
86 | |||
87 | 1 | return { |
|
88 | 'users': users, |
||
89 | 'total_active': total_active, |
||
90 | 'total_uninitialized': total_uninitialized, |
||
91 | 'total_suspended': total_suspended, |
||
92 | 'total_deleted': total_deleted, |
||
93 | 'total_overall': total_overall, |
||
94 | 'search_term': search_term, |
||
95 | 'only': only, |
||
96 | 'UserStateFilter': UserStateFilter, |
||
97 | 'user_state_filter': user_state_filter, |
||
98 | } |
||
99 | |||
100 | |||
101 | 1 | @blueprint.get('/<uuid:user_id>') |
|
102 | 1 | @permission_required('user.view') |
|
103 | 1 | @templated |
|
104 | def view(user_id): |
||
105 | """Show a user's interal profile.""" |
||
106 | 1 | user = _get_user_for_admin_or_404(user_id) |
|
107 | 1 | db_user = user_service.find_user_with_details(user.id) |
|
108 | |||
109 | 1 | recent_login = session_service.find_recent_login(user.id) |
|
110 | 1 | days_since_recent_login = _calculate_days_since(recent_login) |
|
111 | |||
112 | 1 | orga_activities = orga_team_service.get_orga_activities_for_user(user.id) |
|
113 | |||
114 | 1 | newsletter_subscription_states = list( |
|
115 | service.get_newsletter_subscription_states(user.id) |
||
116 | ) |
||
117 | 1 | newsletter_subscription_count = sum( |
|
118 | 1 for _, subscribed in newsletter_subscription_states if subscribed |
||
119 | ) |
||
120 | |||
121 | 1 | orders = order_service.get_orders_placed_by_user(user.id) |
|
122 | |||
123 | 1 | order_shop_ids = {order.shop_id for order in orders} |
|
124 | 1 | shops = shop_service.find_shops(order_shop_ids) |
|
125 | 1 | shops_by_id = {shop.id: shop for shop in shops} |
|
126 | |||
127 | 1 | parties_and_tickets = service.get_parties_and_tickets(user.id) |
|
128 | 1 | ticket_count = sum(len(tickets) for _, tickets in parties_and_tickets) |
|
129 | |||
130 | 1 | attended_parties = service.get_attended_parties(user.id) |
|
131 | |||
132 | 1 | badges_with_awarding_quantity = ( |
|
133 | badge_awarding_service.get_badges_awarded_to_user(user.id) |
||
134 | ) |
||
135 | 1 | badge_count = len(badges_with_awarding_quantity) |
|
136 | |||
137 | 1 | return { |
|
138 | 'profile_user': user, |
||
139 | 'user': db_user, |
||
140 | 'recent_login': recent_login, |
||
141 | 'days_since_recent_login': days_since_recent_login, |
||
142 | 'orga_activities': orga_activities, |
||
143 | 'newsletter_subscription_count': newsletter_subscription_count, |
||
144 | 'newsletter_subscription_states': newsletter_subscription_states, |
||
145 | 'orders': orders, |
||
146 | 'shops_by_id': shops_by_id, |
||
147 | 'parties_and_tickets': parties_and_tickets, |
||
148 | 'ticket_count': ticket_count, |
||
149 | 'attended_parties': attended_parties, |
||
150 | 'badge_count': badge_count, |
||
151 | 'badges_with_awarding_quantity': badges_with_awarding_quantity, |
||
152 | } |
||
153 | |||
154 | |||
155 | 1 | def _calculate_days_since(dt: Optional[datetime]) -> Optional[int]: |
|
156 | 1 | if dt is None: |
|
157 | 1 | return None |
|
158 | |||
159 | return (datetime.utcnow().date() - dt.date()).days |
||
160 | |||
161 | |||
162 | # -------------------------------------------------------------------- # |
||
163 | # account |
||
164 | |||
165 | |||
166 | 1 | @blueprint.get('/create') |
|
167 | 1 | @permission_required('user.create') |
|
168 | 1 | @templated |
|
169 | 1 | def create_account_form(erroneous_form=None): |
|
170 | """Show a form to create a user account.""" |
||
171 | 1 | form = erroneous_form if erroneous_form else CreateAccountForm() |
|
172 | 1 | form.set_site_choices() |
|
173 | |||
174 | 1 | return {'form': form} |
|
175 | |||
176 | |||
177 | 1 | @blueprint.post('/') |
|
178 | 1 | @permission_required('user.create') |
|
179 | def create_account(): |
||
180 | """Create a user account.""" |
||
181 | form = CreateAccountForm(request.form) |
||
182 | form.set_site_choices() |
||
183 | |||
184 | if not form.validate(): |
||
185 | return create_account_form(form) |
||
186 | |||
187 | screen_name = form.screen_name.data.strip() |
||
188 | first_name = form.first_name.data.strip() |
||
189 | last_name = form.last_name.data.strip() |
||
190 | email_address = form.email_address.data.lower() |
||
191 | password = form.password.data |
||
192 | site_id_for_email = form.site_id.data |
||
193 | |||
194 | if site_id_for_email: |
||
195 | site_for_email = site_service.get_site(site_id_for_email) |
||
196 | else: |
||
197 | site_for_email = None |
||
198 | |||
199 | initiator_id = g.user.id |
||
200 | |||
201 | try: |
||
202 | user, event = user_creation_service.create_user( |
||
203 | screen_name, |
||
204 | email_address, |
||
205 | password, |
||
206 | first_name=first_name, |
||
207 | last_name=last_name, |
||
208 | creator_id=initiator_id, |
||
209 | # Do not pass site ID here; the account is not created on a site. |
||
210 | ) |
||
211 | except user_creation_service.UserCreationFailed: |
||
212 | flash_error( |
||
213 | gettext( |
||
214 | 'User "%(screen_name)s" could not be created.', |
||
215 | screen_name=screen_name, |
||
216 | ) |
||
217 | ) |
||
218 | return create_account_form(form) |
||
219 | |||
220 | flash_success( |
||
221 | gettext( |
||
222 | 'User "%(screen_name)s" has been created.', |
||
223 | screen_name=user.screen_name, |
||
224 | ) |
||
225 | ) |
||
226 | |||
227 | if site_for_email: |
||
228 | user_creation_service.request_email_address_confirmation( |
||
229 | user, email_address, site_for_email.id |
||
230 | ) |
||
231 | flash_success( |
||
232 | gettext('An email has been sent to the corresponding address.'), |
||
233 | icon='email', |
||
234 | ) |
||
235 | |||
236 | user_signals.account_created.send(None, event=event) |
||
237 | |||
238 | return redirect_to('.view', user_id=user.id) |
||
239 | |||
240 | |||
241 | 1 | @blueprint.post('/<uuid:user_id>/initialize') |
|
242 | 1 | @permission_required('user.administrate') |
|
243 | 1 | @respond_no_content |
|
244 | def initialize_account(user_id): |
||
245 | """Initialize the user account.""" |
||
246 | user = _get_user_or_404(user_id) |
||
247 | |||
248 | initiator_id = g.user.id |
||
249 | |||
250 | user_command_service.initialize_account(user.id, initiator_id=initiator_id) |
||
251 | |||
252 | flash_success( |
||
253 | gettext( |
||
254 | "User '%(screen_name)s' has been initialized.", |
||
255 | screen_name=user.screen_name, |
||
256 | ) |
||
257 | ) |
||
258 | |||
259 | |||
260 | 1 | View Code Duplication | @blueprint.get('/<uuid:user_id>/suspend') |
261 | 1 | @permission_required('user.administrate') |
|
262 | 1 | @templated |
|
263 | 1 | def suspend_account_form(user_id, erroneous_form=None): |
|
264 | """Show form to suspend the user account.""" |
||
265 | 1 | user = _get_user_for_admin_or_404(user_id) |
|
266 | |||
267 | 1 | if user.suspended: |
|
268 | 1 | flash_error( |
|
269 | gettext( |
||
270 | "User '%(screen_name)s' is already suspended.", |
||
271 | screen_name=user.screen_name, |
||
272 | ) |
||
273 | ) |
||
274 | 1 | return redirect_to('.view', user_id=user.id) |
|
275 | |||
276 | 1 | form = erroneous_form if erroneous_form else SuspendAccountForm() |
|
277 | |||
278 | 1 | return { |
|
279 | 'profile_user': user, |
||
280 | 'user': user, |
||
281 | 'form': form, |
||
282 | } |
||
283 | |||
284 | |||
285 | 1 | View Code Duplication | @blueprint.post('/<uuid:user_id>/suspend') |
286 | 1 | @permission_required('user.administrate') |
|
287 | def suspend_account(user_id): |
||
288 | """Suspend the user account.""" |
||
289 | user = _get_user_or_404(user_id) |
||
290 | |||
291 | if user.suspended: |
||
292 | flash_error( |
||
293 | gettext( |
||
294 | "User '%(screen_name)s' is already suspended.", |
||
295 | screen_name=user.screen_name, |
||
296 | ) |
||
297 | ) |
||
298 | return redirect_to('.view', user_id=user.id) |
||
299 | |||
300 | form = SuspendAccountForm(request.form) |
||
301 | if not form.validate(): |
||
302 | return suspend_account_form(user.id, form) |
||
303 | |||
304 | initiator_id = g.user.id |
||
305 | reason = form.reason.data.strip() |
||
306 | |||
307 | event = user_command_service.suspend_account(user.id, initiator_id, reason) |
||
308 | |||
309 | user_signals.account_suspended.send(None, event=event) |
||
310 | |||
311 | flash_success( |
||
312 | gettext( |
||
313 | "User '%(screen_name)s' has been suspended.", |
||
314 | screen_name=user.screen_name, |
||
315 | ) |
||
316 | ) |
||
317 | |||
318 | return redirect_to('.view', user_id=user.id) |
||
319 | |||
320 | |||
321 | 1 | View Code Duplication | @blueprint.get('/<uuid:user_id>/unsuspend') |
322 | 1 | @permission_required('user.administrate') |
|
323 | 1 | @templated |
|
324 | 1 | def unsuspend_account_form(user_id, erroneous_form=None): |
|
325 | """Show form to unsuspend the user account.""" |
||
326 | 1 | user = _get_user_for_admin_or_404(user_id) |
|
327 | |||
328 | 1 | if not user.suspended: |
|
329 | 1 | flash_error( |
|
330 | gettext( |
||
331 | "User '%(screen_name)s' is not suspended.", |
||
332 | screen_name=user.screen_name, |
||
333 | ) |
||
334 | ) |
||
335 | 1 | return redirect_to('.view', user_id=user.id) |
|
336 | |||
337 | 1 | form = erroneous_form if erroneous_form else SuspendAccountForm() |
|
338 | |||
339 | 1 | return { |
|
340 | 'profile_user': user, |
||
341 | 'user': user, |
||
342 | 'form': form, |
||
343 | } |
||
344 | |||
345 | |||
346 | 1 | View Code Duplication | @blueprint.post('/<uuid:user_id>/unsuspend') |
347 | 1 | @permission_required('user.administrate') |
|
348 | def unsuspend_account(user_id): |
||
349 | """Unsuspend the user account.""" |
||
350 | user = _get_user_or_404(user_id) |
||
351 | |||
352 | if not user.suspended: |
||
353 | flash_error( |
||
354 | gettext( |
||
355 | "User '%(screen_name)s' is not suspended.", |
||
356 | screen_name=user.screen_name, |
||
357 | ) |
||
358 | ) |
||
359 | return redirect_to('.view', user_id=user.id) |
||
360 | |||
361 | form = SuspendAccountForm(request.form) |
||
362 | if not form.validate(): |
||
363 | return unsuspend_account_form(user.id, form) |
||
364 | |||
365 | initiator_id = g.user.id |
||
366 | reason = form.reason.data.strip() |
||
367 | |||
368 | event = user_command_service.unsuspend_account( |
||
369 | user.id, initiator_id, reason |
||
370 | ) |
||
371 | |||
372 | user_signals.account_unsuspended.send(None, event=event) |
||
373 | |||
374 | flash_success( |
||
375 | gettext( |
||
376 | "User '%(screen_name)s' has been unsuspended.", |
||
377 | screen_name=user.screen_name, |
||
378 | ) |
||
379 | ) |
||
380 | |||
381 | return redirect_to('.view', user_id=user.id) |
||
382 | |||
383 | |||
384 | 1 | @blueprint.get('/<uuid:user_id>/delete') |
|
385 | 1 | @permission_required('user.administrate') |
|
386 | 1 | @templated |
|
387 | 1 | def delete_account_form(user_id, erroneous_form=None): |
|
388 | """Show form to delete the user account.""" |
||
389 | 1 | user = _get_user_for_admin_or_404(user_id) |
|
390 | |||
391 | 1 | if user.deleted: |
|
392 | 1 | flash_error( |
|
393 | gettext( |
||
394 | "User '%(screen_name)s' has already been deleted.", |
||
395 | screen_name=user.screen_name, |
||
396 | ) |
||
397 | ) |
||
398 | 1 | return redirect_to('.view', user_id=user.id) |
|
399 | |||
400 | 1 | form = erroneous_form if erroneous_form else DeleteAccountForm() |
|
401 | |||
402 | 1 | return { |
|
403 | 'profile_user': user, |
||
404 | 'user': user, |
||
405 | 'form': form, |
||
406 | 'verification_text': ACCOUNT_DELETION_VERIFICATION_TEXT, |
||
407 | } |
||
408 | |||
409 | |||
410 | 1 | @blueprint.post('/<uuid:user_id>/delete') |
|
411 | 1 | @permission_required('user.administrate') |
|
412 | def delete_account(user_id): |
||
413 | """Delete the user account.""" |
||
414 | user = _get_user_or_404(user_id) |
||
415 | |||
416 | if user.deleted: |
||
417 | flash_error( |
||
418 | gettext( |
||
419 | "User '%(screen_name)s' has already been deleted.", |
||
420 | screen_name=user.screen_name, |
||
421 | ) |
||
422 | ) |
||
423 | return redirect_to('.view', user_id=user.id) |
||
424 | |||
425 | form = DeleteAccountForm(request.form) |
||
426 | if not form.validate(): |
||
427 | return delete_account_form(user.id, form) |
||
428 | |||
429 | initiator_id = g.user.id |
||
430 | reason = form.reason.data.strip() |
||
431 | |||
432 | event = user_deletion_service.delete_account(user.id, initiator_id, reason) |
||
433 | |||
434 | user_signals.account_deleted.send(None, event=event) |
||
435 | |||
436 | flash_success( |
||
437 | gettext( |
||
438 | "User '%(screen_name)s' has been deleted.", |
||
439 | screen_name=user.screen_name, |
||
440 | ) |
||
441 | ) |
||
442 | |||
443 | return redirect_to('.view', user_id=user.id) |
||
444 | |||
445 | |||
446 | # -------------------------------------------------------------------- # |
||
447 | # screen name |
||
448 | |||
449 | |||
450 | 1 | @blueprint.get('/<uuid:user_id>/change_screen_name') |
|
451 | 1 | @permission_required('user.administrate') |
|
452 | 1 | @templated |
|
453 | 1 | def change_screen_name_form(user_id, erroneous_form=None): |
|
454 | """Show form to change the user's screen name.""" |
||
455 | 1 | user = _get_user_for_admin_or_404(user_id) |
|
456 | |||
457 | 1 | form = erroneous_form if erroneous_form else ChangeScreenNameForm() |
|
458 | |||
459 | 1 | return { |
|
460 | 'profile_user': user, |
||
461 | 'user': user, |
||
462 | 'form': form, |
||
463 | } |
||
464 | |||
465 | |||
466 | 1 | @blueprint.post('/<uuid:user_id>/change_screen_name') |
|
467 | 1 | @permission_required('user.administrate') |
|
468 | def change_screen_name(user_id): |
||
469 | """Change the user's screen name.""" |
||
470 | user = _get_user_or_404(user_id) |
||
471 | |||
472 | form = ChangeScreenNameForm(request.form) |
||
473 | if not form.validate(): |
||
474 | return change_screen_name_form(user.id, form) |
||
475 | |||
476 | old_screen_name = user.screen_name |
||
477 | new_screen_name = form.screen_name.data.strip() |
||
478 | initiator_id = g.user.id |
||
479 | reason = form.reason.data.strip() |
||
480 | |||
481 | event = user_command_service.change_screen_name( |
||
482 | user.id, new_screen_name, initiator_id, reason=reason |
||
483 | ) |
||
484 | |||
485 | user_signals.screen_name_changed.send(None, event=event) |
||
486 | |||
487 | flash_success( |
||
488 | gettext( |
||
489 | "User '%(old_screen_name)s' has been renamed to '%(new_screen_name)s'.", |
||
490 | old_screen_name=old_screen_name, |
||
491 | new_screen_name=new_screen_name, |
||
492 | ) |
||
493 | ) |
||
494 | |||
495 | return redirect_to('.view', user_id=user.id) |
||
496 | |||
497 | |||
498 | # -------------------------------------------------------------------- # |
||
499 | # email address |
||
500 | |||
501 | |||
502 | 1 | @blueprint.get('/<uuid:user_id>/change_email_address') |
|
503 | 1 | @permission_required('user.administrate') |
|
504 | 1 | @templated |
|
505 | 1 | def change_email_address_form(user_id, erroneous_form=None): |
|
506 | """Show form to change the user's e-mail address.""" |
||
507 | 1 | user = _get_user_for_admin_or_404(user_id) |
|
508 | |||
509 | 1 | form = erroneous_form if erroneous_form else ChangeEmailAddressForm() |
|
510 | |||
511 | 1 | return { |
|
512 | 'profile_user': user, |
||
513 | 'user': user, |
||
514 | 'form': form, |
||
515 | } |
||
516 | |||
517 | |||
518 | 1 | @blueprint.post('/<uuid:user_id>/change_email_address') |
|
519 | 1 | @permission_required('user.administrate') |
|
520 | def change_email_address(user_id): |
||
521 | """Change the user's e-mail address.""" |
||
522 | user = _get_user_or_404(user_id) |
||
523 | |||
524 | form = ChangeEmailAddressForm(request.form) |
||
525 | if not form.validate(): |
||
526 | return change_email_address_form(user.id, form) |
||
527 | |||
528 | old_email_address = user_service.find_email_address(user.id) |
||
529 | new_email_address = form.email_address.data.strip() |
||
530 | verified = False |
||
531 | initiator_id = g.user.id |
||
532 | reason = form.reason.data.strip() |
||
533 | |||
534 | event = user_command_service.change_email_address( |
||
535 | user.id, new_email_address, verified, initiator_id, reason=reason |
||
536 | ) |
||
537 | |||
538 | user_signals.email_address_changed.send(None, event=event) |
||
539 | |||
540 | flash_success( |
||
541 | gettext( |
||
542 | "Email address for user '%(screen_name)s' has been updated.", |
||
543 | screen_name=user.screen_name, |
||
544 | ) |
||
545 | ) |
||
546 | |||
547 | return redirect_to('.view', user_id=user.id) |
||
548 | |||
549 | |||
550 | 1 | @blueprint.get('/<uuid:user_id>/invalidate_email_address') |
|
551 | 1 | @permission_required('user.administrate') |
|
552 | 1 | @templated |
|
553 | 1 | def invalidate_email_address_form(user_id, erroneous_form=None): |
|
554 | """Show form to invalidate the email address assigned with the account.""" |
||
555 | user = _get_user_for_admin_or_404(user_id) |
||
556 | |||
557 | email_address = user_service.get_email_address_data(user_id) |
||
558 | if not email_address.verified: |
||
559 | flash_error(gettext('Email address is already invalidated.')) |
||
560 | return redirect_to('.view', user_id=user.id) |
||
561 | |||
562 | form = erroneous_form if erroneous_form else InvalidateEmailAddressForm() |
||
563 | |||
564 | return { |
||
565 | 'profile_user': user, |
||
566 | 'user': user, |
||
567 | 'form': form, |
||
568 | } |
||
569 | |||
570 | |||
571 | 1 | @blueprint.post('/<uuid:user_id>/invalidate_email_address') |
|
572 | 1 | @permission_required('user.administrate') |
|
573 | def invalidate_email_address(user_id): |
||
574 | """Invalidate the email address assigned with the account.""" |
||
575 | user = _get_user_or_404(user_id) |
||
576 | |||
577 | email_address = user_service.get_email_address_data(user_id) |
||
578 | if not email_address.verified: |
||
579 | flash_error(gettext('Email address is already invalidated.')) |
||
580 | return redirect_to('.view', user_id=user.id) |
||
581 | |||
582 | form = InvalidateEmailAddressForm(request.form) |
||
583 | if not form.validate(): |
||
584 | return invalidate_email_address_form(user.id, form) |
||
585 | |||
586 | initiator_id = g.user.id |
||
587 | reason = form.reason.data.strip() |
||
588 | |||
589 | event = email_address_service.invalidate_email_address( |
||
590 | user.id, reason, initiator_id=initiator_id |
||
591 | ) |
||
592 | |||
593 | user_signals.email_address_invalidated.send(None, event=event) |
||
594 | |||
595 | flash_success( |
||
596 | gettext( |
||
597 | "The email address of user '%(screen_name)s' has been invalidated.", |
||
598 | screen_name=user.screen_name, |
||
599 | ) |
||
600 | ) |
||
601 | |||
602 | return redirect_to('.view', user_id=user.id) |
||
603 | |||
604 | |||
605 | # -------------------------------------------------------------------- # |
||
606 | # details |
||
607 | |||
608 | |||
609 | 1 | @blueprint.get('/<uuid:user_id>/details') |
|
610 | 1 | @permission_required('user.administrate') |
|
611 | 1 | @templated |
|
612 | 1 | def change_details_form(user_id, erroneous_form=None): |
|
613 | """Show a form to change the user's details.""" |
||
614 | user = _get_user_for_admin_or_404(user_id) |
||
615 | |||
616 | detail = user_service.get_detail(user_id) |
||
617 | |||
618 | form = erroneous_form if erroneous_form else ChangeDetailsForm(obj=detail) |
||
619 | country_names = country_service.get_country_names() |
||
620 | |||
621 | return { |
||
622 | 'profile_user': user, |
||
623 | 'user': user, |
||
624 | 'form': form, |
||
625 | 'country_names': country_names, |
||
626 | } |
||
627 | |||
628 | |||
629 | 1 | @blueprint.post('/<uuid:user_id>/details') |
|
630 | 1 | @permission_required('user.administrate') |
|
631 | def change_details(user_id): |
||
632 | """Change the user's details.""" |
||
633 | user = _get_user_or_404(user_id) |
||
634 | |||
635 | form = ChangeDetailsForm(request.form) |
||
636 | if not form.validate(): |
||
637 | return change_details_form(user.id, form) |
||
638 | |||
639 | first_name = form.first_name.data.strip() |
||
640 | last_name = form.last_name.data.strip() |
||
641 | date_of_birth = form.date_of_birth.data |
||
642 | country = form.country.data.strip() |
||
643 | zip_code = form.zip_code.data.strip() |
||
644 | city = form.city.data.strip() |
||
645 | street = form.street.data.strip() |
||
646 | phone_number = form.phone_number.data.strip() |
||
647 | |||
648 | event = user_command_service.update_user_details( |
||
649 | user.id, |
||
650 | first_name, |
||
651 | last_name, |
||
652 | date_of_birth, |
||
653 | country, |
||
654 | zip_code, |
||
655 | city, |
||
656 | street, |
||
657 | phone_number, |
||
658 | g.user.id, # initiator_id |
||
659 | ) |
||
660 | |||
661 | flash_success(gettext('Changes have been saved.')) |
||
662 | |||
663 | user_signals.details_updated.send(None, event=event) |
||
664 | |||
665 | return redirect_to('.view', user_id=user.id) |
||
666 | |||
667 | |||
668 | # -------------------------------------------------------------------- # |
||
669 | # authentication |
||
670 | |||
671 | |||
672 | 1 | @blueprint.get('/<uuid:user_id>/password') |
|
673 | 1 | @permission_required('user.set_password') |
|
674 | 1 | @templated |
|
675 | 1 | def set_password_form(user_id, erroneous_form=None): |
|
676 | """Show a form to set a new password for the user.""" |
||
677 | 1 | user = _get_user_for_admin_or_404(user_id) |
|
678 | |||
679 | 1 | form = erroneous_form if erroneous_form else SetPasswordForm() |
|
680 | |||
681 | 1 | return { |
|
682 | 'profile_user': user, |
||
683 | 'user': user, |
||
684 | 'form': form, |
||
685 | } |
||
686 | |||
687 | |||
688 | 1 | @blueprint.post('/<uuid:user_id>/password') |
|
689 | 1 | @permission_required('user.set_password') |
|
690 | def set_password(user_id): |
||
691 | """Set a new password for the user.""" |
||
692 | user = _get_user_or_404(user_id) |
||
693 | |||
694 | form = SetPasswordForm(request.form) |
||
695 | if not form.validate(): |
||
696 | return set_password_form(user.id, form) |
||
697 | |||
698 | new_password = form.password.data |
||
699 | initiator_id = g.user.id |
||
700 | |||
701 | password_service.update_password_hash(user.id, new_password, initiator_id) |
||
702 | |||
703 | flash_success( |
||
704 | gettext( |
||
705 | "New password has been set for user '%(screen_name)s'.", |
||
706 | screen_name=user.screen_name, |
||
707 | ) |
||
708 | ) |
||
709 | |||
710 | return redirect_to('.view', user_id=user.id) |
||
711 | |||
712 | |||
713 | # -------------------------------------------------------------------- # |
||
714 | # authorization |
||
715 | |||
716 | |||
717 | 1 | @blueprint.get('/<uuid:user_id>/permissions') |
|
718 | 1 | @permission_required('user.view') |
|
719 | 1 | @templated |
|
720 | def view_permissions(user_id): |
||
721 | """Show user's permissions.""" |
||
722 | 1 | user = _get_user_for_admin_or_404(user_id) |
|
723 | |||
724 | 1 | user_permission_ids_by_role = ( |
|
725 | authorization_service.get_permission_ids_by_role_for_user(user.id) |
||
726 | ) |
||
727 | |||
728 | 1 | permissions_by_role = _index_permissions_by_role( |
|
729 | user_permission_ids_by_role |
||
730 | ) |
||
731 | |||
732 | 1 | return { |
|
733 | 'profile_user': user, |
||
734 | 'user': user, |
||
735 | 'permissions_by_role': permissions_by_role, |
||
736 | } |
||
737 | |||
738 | |||
739 | 1 | @blueprint.get('/<uuid:user_id>/roles/assignment') |
|
740 | 1 | @permission_required('role.assign') |
|
741 | 1 | @templated |
|
742 | def manage_roles(user_id): |
||
743 | """Manage what roles are assigned to the user.""" |
||
744 | 1 | user = _get_user_for_admin_or_404(user_id) |
|
745 | |||
746 | 1 | permission_ids_by_role = authorization_service.get_permission_ids_by_role() |
|
747 | |||
748 | 1 | permissions_by_role = _index_permissions_by_role(permission_ids_by_role) |
|
749 | |||
750 | 1 | user_role_ids = authorization_service.find_role_ids_for_user(user.id) |
|
751 | |||
752 | 1 | return { |
|
753 | 'profile_user': user, |
||
754 | 'user': user, |
||
755 | 'permissions_by_role': permissions_by_role, |
||
756 | 'user_role_ids': user_role_ids, |
||
757 | } |
||
758 | |||
759 | |||
760 | 1 | def _index_permissions_by_role( |
|
761 | permission_ids_by_role: dict[Role, frozenset[PermissionID]] |
||
762 | ) -> dict[Role, frozenset[Permission]]: |
||
763 | 1 | registered_permissions_by_id = { |
|
764 | permission.id: permission |
||
765 | for permission in permission_registry.get_registered_permissions() |
||
766 | } |
||
767 | |||
768 | 1 | return { |
|
769 | role: frozenset( |
||
770 | registered_permissions_by_id[permission_id] |
||
771 | for permission_id in permission_ids |
||
772 | if permission_id in registered_permissions_by_id |
||
773 | ) |
||
774 | for role, permission_ids in permission_ids_by_role.items() |
||
775 | } |
||
776 | |||
777 | |||
778 | 1 | @blueprint.post('/<uuid:user_id>/roles/<role_id>') |
|
779 | 1 | @permission_required('role.assign') |
|
780 | 1 | @respond_no_content |
|
781 | def role_assign(user_id, role_id): |
||
782 | """Assign the role to the user.""" |
||
783 | user = _get_user_or_404(user_id) |
||
784 | role = _get_role_or_404(role_id) |
||
785 | initiator_id = g.user.id |
||
786 | |||
787 | authorization_service.assign_role_to_user( |
||
788 | role.id, user.id, initiator_id=initiator_id |
||
789 | ) |
||
790 | |||
791 | flash_success( |
||
792 | gettext( |
||
793 | '%(role_title)s has been assigned to "%(screen_name)s".', |
||
794 | screen_name=user.screen_name, |
||
795 | role_title=role.title, |
||
796 | ) |
||
797 | ) |
||
798 | |||
799 | |||
800 | 1 | @blueprint.delete('/<uuid:user_id>/roles/<role_id>') |
|
801 | 1 | @permission_required('role.assign') |
|
802 | 1 | @respond_no_content |
|
803 | def role_deassign(user_id, role_id): |
||
804 | """Deassign the role from the user.""" |
||
805 | user = _get_user_or_404(user_id) |
||
806 | role = _get_role_or_404(role_id) |
||
807 | initiator_id = g.user.id |
||
808 | |||
809 | authorization_service.deassign_role_from_user( |
||
810 | role.id, user.id, initiator_id=initiator_id |
||
811 | ) |
||
812 | |||
813 | flash_success( |
||
814 | gettext( |
||
815 | '%(role_title)s has been withdrawn from "%(screen_name)s".', |
||
816 | screen_name=user.screen_name, |
||
817 | role_title=role.title, |
||
818 | ) |
||
819 | ) |
||
820 | |||
821 | |||
822 | # -------------------------------------------------------------------- # |
||
823 | # events |
||
824 | |||
825 | |||
826 | 1 | @blueprint.get('/<uuid:user_id>/events') |
|
827 | 1 | @permission_required('user.view') |
|
828 | 1 | @templated |
|
829 | def view_events(user_id): |
||
830 | """Show user's events.""" |
||
831 | 1 | user = _get_user_for_admin_or_404(user_id) |
|
832 | |||
833 | 1 | log_entries = list(service.get_log_entries(user.id)) |
|
834 | |||
835 | 1 | include_logins = request.args.get('include_logins', default='yes') == 'yes' |
|
836 | 1 | if not include_logins: |
|
837 | log_entries = [ |
||
838 | entry |
||
839 | for entry in log_entries |
||
840 | if entry['event_type'] != 'user-logged-in' |
||
841 | ] |
||
842 | |||
843 | 1 | return { |
|
844 | 'profile_user': user, |
||
845 | 'user': user, |
||
846 | 'log_entries': log_entries, |
||
847 | 'logins_included': include_logins, |
||
848 | } |
||
849 | |||
850 | |||
851 | # -------------------------------------------------------------------- # |
||
852 | # helpers |
||
853 | |||
854 | |||
855 | 1 | def _get_user_for_admin_or_404(user_id) -> UserForAdmin: |
|
856 | 1 | user = user_service.find_user_for_admin(user_id) |
|
857 | |||
858 | 1 | if user is None: |
|
859 | abort(404) |
||
860 | |||
861 | 1 | return user |
|
862 | |||
863 | |||
864 | 1 | def _get_user_or_404(user_id): |
|
865 | user = user_service.find_user(user_id) |
||
866 | |||
867 | if user is None: |
||
868 | abort(404) |
||
869 | |||
870 | return user |
||
871 | |||
872 | |||
873 | 1 | def _get_role_or_404(role_id): |
|
874 | role = authorization_service.find_role(role_id) |
||
875 | |||
876 | if role is None: |
||
877 | abort(404) |
||
878 | |||
879 | return role |
||
880 |