Total Complexity | 66 |
Total Lines | 893 |
Duplicated Lines | 12.54 % |
Coverage | 55.05% |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like byceps.blueprints.admin.user.views often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | """ |
||
2 | byceps.blueprints.admin.user.views |
||
3 | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
||
4 | |||
5 | :Copyright: 2014-2023 Jochen Kupperschmidt |
||
6 | :License: Revised BSD (see `LICENSE` file for details) |
||
7 | """ |
||
8 | |||
9 | 1 | from __future__ import annotations |
|
10 | |||
11 | 1 | from datetime import datetime |
|
12 | |||
13 | 1 | from flask import abort, g, request |
|
14 | 1 | from flask_babel import gettext |
|
15 | |||
16 | 1 | from byceps.services.authn.password import authn_password_service |
|
17 | 1 | from byceps.services.authn.session import authn_session_service |
|
18 | 1 | from byceps.services.authz import authz_service |
|
19 | 1 | from byceps.services.authz.models import Permission, PermissionID, Role |
|
20 | 1 | from byceps.services.country import country_service |
|
21 | 1 | from byceps.services.orga_team import orga_team_service |
|
22 | 1 | from byceps.services.shop.order import order_service |
|
23 | 1 | from byceps.services.shop.shop import shop_service |
|
24 | 1 | from byceps.services.site import site_service |
|
25 | 1 | from byceps.services.user import ( |
|
26 | user_command_service, |
||
27 | user_creation_service, |
||
28 | user_deletion_service, |
||
29 | user_email_address_service, |
||
30 | user_service, |
||
31 | user_stats_service, |
||
32 | ) |
||
33 | 1 | from byceps.services.user.models.user import UserForAdmin, UserStateFilter |
|
34 | 1 | from byceps.services.user_badge import user_badge_awarding_service |
|
35 | 1 | from byceps.signals import authn as authn_signals |
|
36 | 1 | from byceps.signals import authz as authz_signals |
|
37 | 1 | from byceps.signals import user as user_signals |
|
38 | 1 | from byceps.util.authz import permission_registry |
|
39 | 1 | from byceps.util.framework.blueprint import create_blueprint |
|
40 | 1 | from byceps.util.framework.flash import flash_error, flash_success |
|
41 | 1 | from byceps.util.framework.templating import templated |
|
42 | 1 | from byceps.util.views import ( |
|
43 | permission_required, |
||
44 | redirect_to, |
||
45 | respond_no_content, |
||
46 | ) |
||
47 | |||
48 | 1 | from . import service |
|
49 | 1 | from .forms import ( |
|
50 | ACCOUNT_DELETION_VERIFICATION_TEXT, |
||
51 | ChangeDetailsForm, |
||
52 | ChangeEmailAddressForm, |
||
53 | ChangeScreenNameForm, |
||
54 | CreateAccountForm, |
||
55 | DeleteAccountForm, |
||
56 | InvalidateEmailAddressForm, |
||
57 | SetPasswordForm, |
||
58 | SuspendAccountForm, |
||
59 | ) |
||
60 | |||
61 | |||
62 | 1 | blueprint = create_blueprint('user_admin', __name__) |
|
63 | |||
64 | |||
65 | 1 | @blueprint.get('/', defaults={'page': 1}) |
|
66 | 1 | @blueprint.get('/pages/<int:page>') |
|
67 | 1 | @permission_required('user.view') |
|
68 | 1 | @templated |
|
69 | 1 | def index(page): |
|
70 | """List users.""" |
||
71 | 1 | per_page = request.args.get('per_page', type=int, default=20) |
|
72 | 1 | search_term = request.args.get('search_term', default='').strip() |
|
73 | 1 | only = request.args.get('only') |
|
74 | |||
75 | 1 | user_state_filter = UserStateFilter.__members__.get( |
|
76 | only, UserStateFilter.none |
||
77 | ) |
||
78 | |||
79 | 1 | users = user_service.get_users_paginated( |
|
80 | page, per_page, search_term=search_term, state_filter=user_state_filter |
||
81 | ) |
||
82 | |||
83 | 1 | total_active = user_stats_service.count_active_users() |
|
84 | 1 | total_uninitialized = user_stats_service.count_uninitialized_users() |
|
85 | 1 | total_suspended = user_stats_service.count_suspended_users() |
|
86 | 1 | total_deleted = user_stats_service.count_deleted_users() |
|
87 | 1 | total_overall = user_stats_service.count_users() |
|
88 | |||
89 | 1 | return { |
|
90 | 'users': users, |
||
91 | 'total_active': total_active, |
||
92 | 'total_uninitialized': total_uninitialized, |
||
93 | 'total_suspended': total_suspended, |
||
94 | 'total_deleted': total_deleted, |
||
95 | 'total_overall': total_overall, |
||
96 | 'search_term': search_term, |
||
97 | 'only': only, |
||
98 | 'UserStateFilter': UserStateFilter, |
||
99 | 'user_state_filter': user_state_filter, |
||
100 | } |
||
101 | |||
102 | |||
103 | 1 | @blueprint.get('/<uuid:user_id>') |
|
104 | 1 | @permission_required('user.view') |
|
105 | 1 | @templated |
|
106 | 1 | def view(user_id): |
|
107 | """Show a user's interal profile.""" |
||
108 | 1 | user = _get_user_for_admin_or_404(user_id) |
|
109 | 1 | db_user = user_service.find_user_with_details(user.id) |
|
110 | |||
111 | 1 | recent_login = authn_session_service.find_recent_login(user.id) |
|
112 | 1 | days_since_recent_login = _calculate_days_since(recent_login) |
|
113 | |||
114 | 1 | orga_activities = orga_team_service.get_orga_activities_for_user(user.id) |
|
115 | |||
116 | 1 | newsletter_subscription_states = list( |
|
117 | service.get_newsletter_subscription_states(user.id) |
||
118 | ) |
||
119 | 1 | newsletter_subscription_count = sum( |
|
120 | 1 for _, subscribed in newsletter_subscription_states if subscribed |
||
121 | ) |
||
122 | |||
123 | 1 | orders = order_service.get_orders_placed_by_user(user.id) |
|
124 | |||
125 | 1 | order_shop_ids = {order.shop_id for order in orders} |
|
126 | 1 | shops = shop_service.find_shops(order_shop_ids) |
|
127 | 1 | shops_by_id = {shop.id: shop for shop in shops} |
|
128 | |||
129 | 1 | parties_and_tickets = service.get_parties_and_tickets(user.id) |
|
130 | 1 | ticket_count = sum(len(tickets) for _, tickets in parties_and_tickets) |
|
131 | |||
132 | 1 | attended_parties = service.get_attended_parties(user.id) |
|
133 | |||
134 | 1 | badges_with_awarding_quantity = ( |
|
135 | user_badge_awarding_service.get_badges_awarded_to_user(user.id) |
||
136 | ) |
||
137 | 1 | badge_count = len(badges_with_awarding_quantity) |
|
138 | |||
139 | 1 | return { |
|
140 | 'profile_user': user, |
||
141 | 'user': db_user, |
||
142 | 'recent_login': recent_login, |
||
143 | 'days_since_recent_login': days_since_recent_login, |
||
144 | 'orga_activities': orga_activities, |
||
145 | 'newsletter_subscription_count': newsletter_subscription_count, |
||
146 | 'newsletter_subscription_states': newsletter_subscription_states, |
||
147 | 'orders': orders, |
||
148 | 'shops_by_id': shops_by_id, |
||
149 | 'parties_and_tickets': parties_and_tickets, |
||
150 | 'ticket_count': ticket_count, |
||
151 | 'attended_parties': attended_parties, |
||
152 | 'badge_count': badge_count, |
||
153 | 'badges_with_awarding_quantity': badges_with_awarding_quantity, |
||
154 | } |
||
155 | |||
156 | |||
157 | 1 | def _calculate_days_since(dt: datetime | None) -> int | None: |
|
158 | 1 | if dt is None: |
|
159 | 1 | return None |
|
160 | |||
161 | return (datetime.utcnow().date() - dt.date()).days |
||
162 | |||
163 | |||
164 | # -------------------------------------------------------------------- # |
||
165 | # account |
||
166 | |||
167 | |||
168 | 1 | @blueprint.get('/create') |
|
169 | 1 | @permission_required('user.create') |
|
170 | 1 | @templated |
|
171 | 1 | def create_account_form(erroneous_form=None): |
|
172 | """Show a form to create a user account.""" |
||
173 | 1 | form = erroneous_form if erroneous_form else CreateAccountForm() |
|
174 | 1 | form.set_site_choices() |
|
175 | |||
176 | 1 | return {'form': form} |
|
177 | |||
178 | |||
179 | 1 | @blueprint.post('/') |
|
180 | 1 | @permission_required('user.create') |
|
181 | 1 | def create_account(): |
|
182 | """Create a user account.""" |
||
183 | form = CreateAccountForm(request.form) |
||
184 | form.set_site_choices() |
||
185 | |||
186 | if not form.validate(): |
||
187 | return create_account_form(form) |
||
188 | |||
189 | screen_name = form.screen_name.data.strip() |
||
190 | first_name = form.first_name.data.strip() |
||
191 | last_name = form.last_name.data.strip() |
||
192 | email_address = form.email_address.data.lower() |
||
193 | password = form.password.data |
||
194 | site_id_for_email = form.site_id.data |
||
195 | |||
196 | if site_id_for_email: |
||
197 | site_for_email = site_service.get_site(site_id_for_email) |
||
198 | else: |
||
199 | site_for_email = None |
||
200 | |||
201 | initiator = g.user |
||
202 | |||
203 | creation_result = user_creation_service.create_user( |
||
204 | screen_name, |
||
205 | email_address, |
||
206 | password, |
||
207 | first_name=first_name, |
||
208 | last_name=last_name, |
||
209 | creation_method='admin app', |
||
210 | creator=initiator, |
||
211 | # Do not pass site ID here; the account is not created on a site. |
||
212 | ip_address=request.remote_addr, |
||
213 | ) |
||
214 | if creation_result.is_err(): |
||
215 | flash_error( |
||
216 | gettext( |
||
217 | 'User "%(screen_name)s" could not be created.', |
||
218 | screen_name=screen_name, |
||
219 | ) |
||
220 | ) |
||
221 | return create_account_form(form) |
||
222 | |||
223 | user, event = creation_result.unwrap() |
||
224 | |||
225 | flash_success( |
||
226 | gettext( |
||
227 | 'User "%(screen_name)s" has been created.', |
||
228 | screen_name=user.screen_name, |
||
229 | ) |
||
230 | ) |
||
231 | |||
232 | if site_for_email: |
||
233 | user_creation_service.request_email_address_confirmation( |
||
234 | user, email_address, site_for_email.id |
||
235 | ).unwrap() |
||
236 | flash_success( |
||
237 | gettext('An email has been sent to the corresponding address.'), |
||
238 | icon='email', |
||
239 | ) |
||
240 | |||
241 | user_signals.account_created.send(None, event=event) |
||
242 | |||
243 | return redirect_to('.view', user_id=user.id) |
||
244 | |||
245 | |||
246 | 1 | @blueprint.post('/<uuid:user_id>/initialize') |
|
247 | 1 | @permission_required('user.administrate') |
|
248 | 1 | @respond_no_content |
|
249 | 1 | def initialize_account(user_id): |
|
250 | """Initialize the user account.""" |
||
251 | user = _get_user_or_404(user_id) |
||
252 | |||
253 | initiator = g.user |
||
254 | |||
255 | user_command_service.initialize_account(user, initiator=initiator) |
||
256 | |||
257 | flash_success( |
||
258 | gettext( |
||
259 | "User '%(screen_name)s' has been initialized.", |
||
260 | screen_name=user.screen_name, |
||
261 | ) |
||
262 | ) |
||
263 | |||
264 | |||
265 | 1 | View Code Duplication | @blueprint.get('/<uuid:user_id>/suspend') |
266 | 1 | @permission_required('user.administrate') |
|
267 | 1 | @templated |
|
268 | 1 | def suspend_account_form(user_id, erroneous_form=None): |
|
269 | """Show form to suspend the user account.""" |
||
270 | 1 | user = _get_user_for_admin_or_404(user_id) |
|
271 | |||
272 | 1 | if user.suspended: |
|
273 | 1 | flash_error( |
|
274 | gettext( |
||
275 | "User '%(screen_name)s' is already suspended.", |
||
276 | screen_name=user.screen_name, |
||
277 | ) |
||
278 | ) |
||
279 | 1 | return redirect_to('.view', user_id=user.id) |
|
280 | |||
281 | 1 | form = erroneous_form if erroneous_form else SuspendAccountForm() |
|
282 | |||
283 | 1 | return { |
|
284 | 'profile_user': user, |
||
285 | 'user': user, |
||
286 | 'form': form, |
||
287 | } |
||
288 | |||
289 | |||
290 | 1 | View Code Duplication | @blueprint.post('/<uuid:user_id>/suspend') |
291 | 1 | @permission_required('user.administrate') |
|
292 | 1 | def suspend_account(user_id): |
|
293 | """Suspend the user account.""" |
||
294 | user = _get_user_or_404(user_id) |
||
295 | |||
296 | if user.suspended: |
||
297 | flash_error( |
||
298 | gettext( |
||
299 | "User '%(screen_name)s' is already suspended.", |
||
300 | screen_name=user.screen_name, |
||
301 | ) |
||
302 | ) |
||
303 | return redirect_to('.view', user_id=user.id) |
||
304 | |||
305 | form = SuspendAccountForm(request.form) |
||
306 | if not form.validate(): |
||
307 | return suspend_account_form(user.id, form) |
||
308 | |||
309 | initiator = g.user |
||
310 | reason = form.reason.data.strip() |
||
311 | |||
312 | event = user_command_service.suspend_account(user, initiator, reason) |
||
313 | |||
314 | user_signals.account_suspended.send(None, event=event) |
||
315 | |||
316 | flash_success( |
||
317 | gettext( |
||
318 | "User '%(screen_name)s' has been suspended.", |
||
319 | screen_name=user.screen_name, |
||
320 | ) |
||
321 | ) |
||
322 | |||
323 | return redirect_to('.view', user_id=user.id) |
||
324 | |||
325 | |||
326 | 1 | View Code Duplication | @blueprint.get('/<uuid:user_id>/unsuspend') |
327 | 1 | @permission_required('user.administrate') |
|
328 | 1 | @templated |
|
329 | 1 | def unsuspend_account_form(user_id, erroneous_form=None): |
|
330 | """Show form to unsuspend the user account.""" |
||
331 | 1 | user = _get_user_for_admin_or_404(user_id) |
|
332 | |||
333 | 1 | if not user.suspended: |
|
334 | 1 | flash_error( |
|
335 | gettext( |
||
336 | "User '%(screen_name)s' is not suspended.", |
||
337 | screen_name=user.screen_name, |
||
338 | ) |
||
339 | ) |
||
340 | 1 | return redirect_to('.view', user_id=user.id) |
|
341 | |||
342 | 1 | form = erroneous_form if erroneous_form else SuspendAccountForm() |
|
343 | |||
344 | 1 | return { |
|
345 | 'profile_user': user, |
||
346 | 'user': user, |
||
347 | 'form': form, |
||
348 | } |
||
349 | |||
350 | |||
351 | 1 | View Code Duplication | @blueprint.post('/<uuid:user_id>/unsuspend') |
352 | 1 | @permission_required('user.administrate') |
|
353 | 1 | def unsuspend_account(user_id): |
|
354 | """Unsuspend the user account.""" |
||
355 | user = _get_user_or_404(user_id) |
||
356 | |||
357 | if not user.suspended: |
||
358 | flash_error( |
||
359 | gettext( |
||
360 | "User '%(screen_name)s' is not suspended.", |
||
361 | screen_name=user.screen_name, |
||
362 | ) |
||
363 | ) |
||
364 | return redirect_to('.view', user_id=user.id) |
||
365 | |||
366 | form = SuspendAccountForm(request.form) |
||
367 | if not form.validate(): |
||
368 | return unsuspend_account_form(user.id, form) |
||
369 | |||
370 | initiator = g.user |
||
371 | reason = form.reason.data.strip() |
||
372 | |||
373 | event = user_command_service.unsuspend_account(user, initiator, reason) |
||
374 | |||
375 | user_signals.account_unsuspended.send(None, event=event) |
||
376 | |||
377 | flash_success( |
||
378 | gettext( |
||
379 | "User '%(screen_name)s' has been unsuspended.", |
||
380 | screen_name=user.screen_name, |
||
381 | ) |
||
382 | ) |
||
383 | |||
384 | return redirect_to('.view', user_id=user.id) |
||
385 | |||
386 | |||
387 | 1 | @blueprint.get('/<uuid:user_id>/delete') |
|
388 | 1 | @permission_required('user.administrate') |
|
389 | 1 | @templated |
|
390 | 1 | def delete_account_form(user_id, erroneous_form=None): |
|
391 | """Show form to delete the user account.""" |
||
392 | 1 | user = _get_user_for_admin_or_404(user_id) |
|
393 | |||
394 | 1 | if user.deleted: |
|
395 | 1 | flash_error( |
|
396 | gettext( |
||
397 | "User '%(screen_name)s' has already been deleted.", |
||
398 | screen_name=user.screen_name, |
||
399 | ) |
||
400 | ) |
||
401 | 1 | return redirect_to('.view', user_id=user.id) |
|
402 | |||
403 | 1 | form = erroneous_form if erroneous_form else DeleteAccountForm() |
|
404 | |||
405 | 1 | return { |
|
406 | 'profile_user': user, |
||
407 | 'user': user, |
||
408 | 'form': form, |
||
409 | 'verification_text': ACCOUNT_DELETION_VERIFICATION_TEXT, |
||
410 | } |
||
411 | |||
412 | |||
413 | 1 | @blueprint.post('/<uuid:user_id>/delete') |
|
414 | 1 | @permission_required('user.administrate') |
|
415 | 1 | def delete_account(user_id): |
|
416 | """Delete the user account.""" |
||
417 | user = _get_user_or_404(user_id) |
||
418 | |||
419 | if user.deleted: |
||
420 | flash_error( |
||
421 | gettext( |
||
422 | "User '%(screen_name)s' has already been deleted.", |
||
423 | screen_name=user.screen_name, |
||
424 | ) |
||
425 | ) |
||
426 | return redirect_to('.view', user_id=user.id) |
||
427 | |||
428 | form = DeleteAccountForm(request.form) |
||
429 | if not form.validate(): |
||
430 | return delete_account_form(user.id, form) |
||
431 | |||
432 | initiator = g.user |
||
433 | reason = form.reason.data.strip() |
||
434 | |||
435 | event = user_deletion_service.delete_account(user, initiator, reason) |
||
436 | |||
437 | user_signals.account_deleted.send(None, event=event) |
||
438 | |||
439 | flash_success( |
||
440 | gettext( |
||
441 | "User '%(screen_name)s' has been deleted.", |
||
442 | screen_name=user.screen_name, |
||
443 | ) |
||
444 | ) |
||
445 | |||
446 | return redirect_to('.view', user_id=user.id) |
||
447 | |||
448 | |||
449 | # -------------------------------------------------------------------- # |
||
450 | # screen name |
||
451 | |||
452 | |||
453 | 1 | @blueprint.get('/<uuid:user_id>/change_screen_name') |
|
454 | 1 | @permission_required('user.administrate') |
|
455 | 1 | @templated |
|
456 | 1 | def change_screen_name_form(user_id, erroneous_form=None): |
|
457 | """Show form to change the user's screen name.""" |
||
458 | 1 | user = _get_user_for_admin_or_404(user_id) |
|
459 | |||
460 | 1 | form = erroneous_form if erroneous_form else ChangeScreenNameForm() |
|
461 | |||
462 | 1 | return { |
|
463 | 'profile_user': user, |
||
464 | 'user': user, |
||
465 | 'form': form, |
||
466 | } |
||
467 | |||
468 | |||
469 | 1 | @blueprint.post('/<uuid:user_id>/change_screen_name') |
|
470 | 1 | @permission_required('user.administrate') |
|
471 | 1 | def change_screen_name(user_id): |
|
472 | """Change the user's screen name.""" |
||
473 | user = _get_user_or_404(user_id) |
||
474 | |||
475 | form = ChangeScreenNameForm(request.form) |
||
476 | if not form.validate(): |
||
477 | return change_screen_name_form(user.id, form) |
||
478 | |||
479 | old_screen_name = user.screen_name |
||
480 | new_screen_name = form.screen_name.data.strip() |
||
481 | initiator = g.user |
||
482 | reason = form.reason.data.strip() |
||
483 | |||
484 | event = user_command_service.change_screen_name( |
||
485 | user, new_screen_name, initiator, reason=reason |
||
486 | ) |
||
487 | |||
488 | user_signals.screen_name_changed.send(None, event=event) |
||
489 | |||
490 | flash_success( |
||
491 | gettext( |
||
492 | "User '%(old_screen_name)s' has been renamed to '%(new_screen_name)s'.", |
||
493 | old_screen_name=old_screen_name, |
||
494 | new_screen_name=new_screen_name, |
||
495 | ) |
||
496 | ) |
||
497 | |||
498 | return redirect_to('.view', user_id=user.id) |
||
499 | |||
500 | |||
501 | # -------------------------------------------------------------------- # |
||
502 | # email address |
||
503 | |||
504 | |||
505 | 1 | @blueprint.get('/<uuid:user_id>/change_email_address') |
|
506 | 1 | @permission_required('user.administrate') |
|
507 | 1 | @templated |
|
508 | 1 | def change_email_address_form(user_id, erroneous_form=None): |
|
509 | """Show form to change the user's e-mail address.""" |
||
510 | 1 | user = _get_user_for_admin_or_404(user_id) |
|
511 | |||
512 | 1 | form = erroneous_form if erroneous_form else ChangeEmailAddressForm() |
|
513 | |||
514 | 1 | return { |
|
515 | 'profile_user': user, |
||
516 | 'user': user, |
||
517 | 'form': form, |
||
518 | } |
||
519 | |||
520 | |||
521 | 1 | @blueprint.post('/<uuid:user_id>/change_email_address') |
|
522 | 1 | @permission_required('user.administrate') |
|
523 | 1 | def change_email_address(user_id): |
|
524 | """Change the user's e-mail address.""" |
||
525 | user = _get_user_or_404(user_id) |
||
526 | |||
527 | form = ChangeEmailAddressForm(request.form) |
||
528 | if not form.validate(): |
||
529 | return change_email_address_form(user.id, form) |
||
530 | |||
531 | new_email_address = form.email_address.data.strip() |
||
532 | verified = False |
||
533 | initiator = g.user |
||
534 | reason = form.reason.data.strip() |
||
535 | |||
536 | event = user_command_service.change_email_address( |
||
537 | user, new_email_address, verified, initiator, reason=reason |
||
538 | ) |
||
539 | |||
540 | user_signals.email_address_changed.send(None, event=event) |
||
541 | |||
542 | flash_success( |
||
543 | gettext( |
||
544 | "Email address for user '%(screen_name)s' has been updated.", |
||
545 | screen_name=user.screen_name, |
||
546 | ) |
||
547 | ) |
||
548 | |||
549 | return redirect_to('.view', user_id=user.id) |
||
550 | |||
551 | |||
552 | 1 | @blueprint.get('/<uuid:user_id>/invalidate_email_address') |
|
553 | 1 | @permission_required('user.administrate') |
|
554 | 1 | @templated |
|
555 | 1 | def invalidate_email_address_form(user_id, erroneous_form=None): |
|
556 | """Show form to invalidate the email address assigned with the account.""" |
||
557 | user = _get_user_for_admin_or_404(user_id) |
||
558 | |||
559 | email_address = user_service.get_email_address_data(user_id) |
||
560 | if not email_address.verified: |
||
561 | flash_error(gettext('Email address is already invalidated.')) |
||
562 | return redirect_to('.view', user_id=user.id) |
||
563 | |||
564 | form = erroneous_form if erroneous_form else InvalidateEmailAddressForm() |
||
565 | |||
566 | return { |
||
567 | 'profile_user': user, |
||
568 | 'user': user, |
||
569 | 'form': form, |
||
570 | } |
||
571 | |||
572 | |||
573 | 1 | @blueprint.post('/<uuid:user_id>/invalidate_email_address') |
|
574 | 1 | @permission_required('user.administrate') |
|
575 | 1 | def invalidate_email_address(user_id): |
|
576 | """Invalidate the email address assigned with the account.""" |
||
577 | user = _get_user_or_404(user_id) |
||
578 | |||
579 | form = InvalidateEmailAddressForm(request.form) |
||
580 | if not form.validate(): |
||
581 | return invalidate_email_address_form(user.id, form) |
||
582 | |||
583 | initiator = g.user |
||
584 | reason = form.reason.data.strip() |
||
585 | |||
586 | invalidation_result = user_email_address_service.invalidate_email_address( |
||
587 | user, reason, initiator=initiator |
||
588 | ) |
||
589 | |||
590 | if invalidation_result.is_err(): |
||
591 | flash_error(invalidation_result.unwrap_err()) |
||
592 | return redirect_to('.view', user_id=user.id) |
||
593 | |||
594 | event = invalidation_result.unwrap() |
||
595 | |||
596 | user_signals.email_address_invalidated.send(None, event=event) |
||
597 | |||
598 | flash_success( |
||
599 | gettext( |
||
600 | "The email address of user '%(screen_name)s' has been invalidated.", |
||
601 | screen_name=user.screen_name, |
||
602 | ) |
||
603 | ) |
||
604 | |||
605 | return redirect_to('.view', user_id=user.id) |
||
606 | |||
607 | |||
608 | # -------------------------------------------------------------------- # |
||
609 | # details |
||
610 | |||
611 | |||
612 | 1 | @blueprint.get('/<uuid:user_id>/details') |
|
613 | 1 | @permission_required('user.administrate') |
|
614 | 1 | @templated |
|
615 | 1 | def change_details_form(user_id, erroneous_form=None): |
|
616 | """Show a form to change the user's details.""" |
||
617 | user = _get_user_for_admin_or_404(user_id) |
||
618 | |||
619 | detail = user_service.get_detail(user_id) |
||
620 | |||
621 | form = erroneous_form if erroneous_form else ChangeDetailsForm(obj=detail) |
||
622 | country_names = country_service.get_country_names() |
||
623 | |||
624 | return { |
||
625 | 'profile_user': user, |
||
626 | 'user': user, |
||
627 | 'form': form, |
||
628 | 'country_names': country_names, |
||
629 | } |
||
630 | |||
631 | |||
632 | 1 | @blueprint.post('/<uuid:user_id>/details') |
|
633 | 1 | @permission_required('user.administrate') |
|
634 | 1 | def change_details(user_id): |
|
635 | """Change the user's details.""" |
||
636 | user = _get_user_or_404(user_id) |
||
637 | |||
638 | form = ChangeDetailsForm(request.form) |
||
639 | if not form.validate(): |
||
640 | return change_details_form(user.id, form) |
||
641 | |||
642 | first_name = form.first_name.data.strip() |
||
643 | last_name = form.last_name.data.strip() |
||
644 | date_of_birth = form.date_of_birth.data |
||
645 | country = form.country.data.strip() |
||
646 | zip_code = form.zip_code.data.strip() |
||
647 | city = form.city.data.strip() |
||
648 | street = form.street.data.strip() |
||
649 | phone_number = form.phone_number.data.strip() |
||
650 | initiator = g.user |
||
651 | |||
652 | event = user_command_service.update_user_details( |
||
653 | user.id, |
||
654 | first_name, |
||
655 | last_name, |
||
656 | date_of_birth, |
||
657 | country, |
||
658 | zip_code, |
||
659 | city, |
||
660 | street, |
||
661 | phone_number, |
||
662 | initiator, |
||
663 | ) |
||
664 | |||
665 | flash_success(gettext('Changes have been saved.')) |
||
666 | |||
667 | user_signals.details_updated.send(None, event=event) |
||
668 | |||
669 | return redirect_to('.view', user_id=user.id) |
||
670 | |||
671 | |||
672 | # -------------------------------------------------------------------- # |
||
673 | # authentication |
||
674 | |||
675 | |||
676 | 1 | @blueprint.get('/<uuid:user_id>/password') |
|
677 | 1 | @permission_required('user.set_password') |
|
678 | 1 | @templated |
|
679 | 1 | def set_password_form(user_id, erroneous_form=None): |
|
680 | """Show a form to set a new password for the user.""" |
||
681 | 1 | user = _get_user_for_admin_or_404(user_id) |
|
682 | |||
683 | 1 | form = erroneous_form if erroneous_form else SetPasswordForm() |
|
684 | |||
685 | 1 | return { |
|
686 | 'profile_user': user, |
||
687 | 'user': user, |
||
688 | 'form': form, |
||
689 | } |
||
690 | |||
691 | |||
692 | 1 | @blueprint.post('/<uuid:user_id>/password') |
|
693 | 1 | @permission_required('user.set_password') |
|
694 | 1 | def set_password(user_id): |
|
695 | """Set a new password for the user.""" |
||
696 | user = _get_user_or_404(user_id) |
||
697 | |||
698 | form = SetPasswordForm(request.form) |
||
699 | if not form.validate(): |
||
700 | return set_password_form(user.id, form) |
||
701 | |||
702 | new_password = form.password.data |
||
703 | initiator = g.user |
||
704 | |||
705 | event = authn_password_service.update_password_hash( |
||
706 | user, new_password, initiator |
||
707 | ) |
||
708 | |||
709 | authn_signals.password_updated.send(None, event=event) |
||
710 | |||
711 | flash_success( |
||
712 | gettext( |
||
713 | "New password has been set for user '%(screen_name)s'.", |
||
714 | screen_name=user.screen_name, |
||
715 | ) |
||
716 | ) |
||
717 | |||
718 | return redirect_to('.view', user_id=user.id) |
||
719 | |||
720 | |||
721 | # -------------------------------------------------------------------- # |
||
722 | # authorization |
||
723 | |||
724 | |||
725 | 1 | @blueprint.get('/<uuid:user_id>/permissions') |
|
726 | 1 | @permission_required('user.view') |
|
727 | 1 | @templated |
|
728 | 1 | def view_permissions(user_id): |
|
729 | """Show user's permissions.""" |
||
730 | 1 | user = _get_user_for_admin_or_404(user_id) |
|
731 | |||
732 | 1 | user_permission_ids_by_role = ( |
|
733 | authz_service.get_permission_ids_by_role_for_user(user.id) |
||
734 | ) |
||
735 | |||
736 | 1 | permissions_by_role = _index_permissions_by_role( |
|
737 | user_permission_ids_by_role |
||
738 | ) |
||
739 | |||
740 | 1 | return { |
|
741 | 'profile_user': user, |
||
742 | 'user': user, |
||
743 | 'permissions_by_role': permissions_by_role, |
||
744 | } |
||
745 | |||
746 | |||
747 | 1 | @blueprint.get('/<uuid:user_id>/roles/assignment') |
|
748 | 1 | @permission_required('role.assign') |
|
749 | 1 | @templated |
|
750 | 1 | def manage_roles(user_id): |
|
751 | """Manage what roles are assigned to the user.""" |
||
752 | 1 | user = _get_user_for_admin_or_404(user_id) |
|
753 | |||
754 | 1 | permission_ids_by_role = authz_service.get_permission_ids_by_role() |
|
755 | |||
756 | 1 | permissions_by_role = _index_permissions_by_role(permission_ids_by_role) |
|
757 | |||
758 | 1 | user_role_ids = authz_service.find_role_ids_for_user(user.id) |
|
759 | |||
760 | 1 | return { |
|
761 | 'profile_user': user, |
||
762 | 'user': user, |
||
763 | 'permissions_by_role': permissions_by_role, |
||
764 | 'user_role_ids': user_role_ids, |
||
765 | } |
||
766 | |||
767 | |||
768 | 1 | def _index_permissions_by_role( |
|
769 | permission_ids_by_role: dict[Role, frozenset[PermissionID]] |
||
770 | ) -> dict[Role, frozenset[Permission]]: |
||
771 | 1 | registered_permissions_by_id = { |
|
772 | permission.id: permission |
||
773 | for permission in permission_registry.get_registered_permissions() |
||
774 | } |
||
775 | |||
776 | 1 | return { |
|
777 | role: frozenset( |
||
778 | registered_permissions_by_id[permission_id] |
||
779 | for permission_id in permission_ids |
||
780 | if permission_id in registered_permissions_by_id |
||
781 | ) |
||
782 | for role, permission_ids in permission_ids_by_role.items() |
||
783 | } |
||
784 | |||
785 | |||
786 | 1 | @blueprint.post('/<uuid:user_id>/roles/<role_id>') |
|
787 | 1 | @permission_required('role.assign') |
|
788 | 1 | @respond_no_content |
|
789 | 1 | def role_assign(user_id, role_id): |
|
790 | """Assign the role to the user.""" |
||
791 | user = _get_user_or_404(user_id) |
||
792 | role = _get_role_or_404(role_id) |
||
793 | initiator = g.user |
||
794 | |||
795 | event = authz_service.assign_role_to_user( |
||
796 | role.id, user, initiator=initiator |
||
797 | ) |
||
798 | |||
799 | if event is not None: |
||
800 | authz_signals.role_assigned_to_user.send(None, event=event) |
||
801 | |||
802 | flash_success( |
||
803 | gettext( |
||
804 | '%(role_title)s has been assigned to "%(screen_name)s".', |
||
805 | screen_name=user.screen_name, |
||
806 | role_title=role.title, |
||
807 | ) |
||
808 | ) |
||
809 | |||
810 | |||
811 | 1 | @blueprint.delete('/<uuid:user_id>/roles/<role_id>') |
|
812 | 1 | @permission_required('role.assign') |
|
813 | 1 | @respond_no_content |
|
814 | 1 | def role_deassign(user_id, role_id): |
|
815 | """Deassign the role from the user.""" |
||
816 | user = _get_user_or_404(user_id) |
||
817 | role = _get_role_or_404(role_id) |
||
818 | initiator = g.user |
||
819 | |||
820 | event = authz_service.deassign_role_from_user( |
||
821 | role.id, user, initiator=initiator |
||
822 | ).unwrap() |
||
823 | |||
824 | authz_signals.role_deassigned_from_user.send(None, event=event) |
||
825 | |||
826 | flash_success( |
||
827 | gettext( |
||
828 | '%(role_title)s has been withdrawn from "%(screen_name)s".', |
||
829 | screen_name=user.screen_name, |
||
830 | role_title=role.title, |
||
831 | ) |
||
832 | ) |
||
833 | |||
834 | |||
835 | # -------------------------------------------------------------------- # |
||
836 | # events |
||
837 | |||
838 | |||
839 | 1 | @blueprint.get('/<uuid:user_id>/events') |
|
840 | 1 | @permission_required('user.view') |
|
841 | 1 | @templated |
|
842 | 1 | def view_events(user_id): |
|
843 | """Show user's events.""" |
||
844 | 1 | user = _get_user_for_admin_or_404(user_id) |
|
845 | |||
846 | 1 | log_entries = list(service.get_log_entries(user.id)) |
|
847 | |||
848 | 1 | include_logins = request.args.get('include_logins', default='yes') == 'yes' |
|
849 | 1 | if not include_logins: |
|
850 | log_entries = [ |
||
851 | entry |
||
852 | for entry in log_entries |
||
853 | if entry['event_type'] != 'user-logged-in' |
||
854 | ] |
||
855 | |||
856 | 1 | return { |
|
857 | 'profile_user': user, |
||
858 | 'user': user, |
||
859 | 'log_entries': log_entries, |
||
860 | 'logins_included': include_logins, |
||
861 | } |
||
862 | |||
863 | |||
864 | # -------------------------------------------------------------------- # |
||
865 | # helpers |
||
866 | |||
867 | |||
868 | 1 | def _get_user_for_admin_or_404(user_id) -> UserForAdmin: |
|
869 | 1 | user = user_service.find_user_for_admin(user_id) |
|
870 | |||
871 | 1 | if user is None: |
|
872 | abort(404) |
||
873 | |||
874 | 1 | return user |
|
875 | |||
876 | |||
877 | 1 | def _get_user_or_404(user_id): |
|
878 | user = user_service.find_user(user_id) |
||
879 | |||
880 | if user is None: |
||
881 | abort(404) |
||
882 | |||
883 | return user |
||
884 | |||
885 | |||
886 | 1 | def _get_role_or_404(role_id): |
|
887 | role = authz_service.find_role(role_id) |
||
888 | |||
889 | if role is None: |
||
890 | abort(404) |
||
891 | |||
892 | return role |
||
893 |