Total Complexity | 65 |
Total Lines | 876 |
Duplicated Lines | 13.01 % |
Coverage | 53.39% |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like byceps.blueprints.admin.user.views often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | """ |
||
2 | byceps.blueprints.admin.user.views |
||
3 | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
||
4 | |||
5 | :Copyright: 2006-2021 Jochen Kupperschmidt |
||
6 | :License: Revised BSD (see `LICENSE` file for details) |
||
7 | """ |
||
8 | |||
9 | 1 | from __future__ import annotations |
|
10 | 1 | from datetime import datetime |
|
11 | 1 | from typing import Optional |
|
12 | |||
13 | 1 | from flask import abort, g, request |
|
14 | 1 | from flask_babel import gettext |
|
15 | |||
16 | 1 | from ....services.authentication.password import service as password_service |
|
17 | 1 | from ....services.authentication.session import service as session_service |
|
18 | 1 | from ....services.authorization import service as authorization_service |
|
19 | 1 | from ....services.authorization.transfer.models import ( |
|
20 | Role, |
||
21 | Permission, |
||
22 | PermissionID, |
||
23 | ) |
||
24 | 1 | from ....services.country import service as country_service |
|
25 | 1 | from ....services.orga_team import service as orga_team_service |
|
26 | 1 | from ....services.shop.order import service as order_service |
|
27 | 1 | from ....services.shop.shop import service as shop_service |
|
28 | 1 | from ....services.site import service as site_service |
|
29 | 1 | from ....services.user import ( |
|
30 | command_service as user_command_service, |
||
31 | creation_service as user_creation_service, |
||
32 | deletion_service as user_deletion_service, |
||
33 | email_address_verification_service, |
||
34 | service as user_service, |
||
35 | stats_service as user_stats_service, |
||
36 | ) |
||
37 | 1 | from ....services.user.transfer.models import UserForAdmin, UserStateFilter |
|
38 | 1 | from ....services.user_badge import awarding_service as badge_awarding_service |
|
39 | 1 | from ....signals import user as user_signals |
|
40 | 1 | from ....util.authorization import permission_registry |
|
41 | 1 | from ....util.framework.blueprint import create_blueprint |
|
42 | 1 | from ....util.framework.flash import flash_error, flash_success |
|
43 | 1 | from ....util.framework.templating import templated |
|
44 | 1 | from ....util.views import permission_required, redirect_to, respond_no_content |
|
45 | |||
46 | 1 | from .forms import ( |
|
47 | ChangeDetailsForm, |
||
48 | ChangeEmailAddressForm, |
||
49 | ChangeScreenNameForm, |
||
50 | CreateAccountForm, |
||
51 | DeleteAccountForm, |
||
52 | InvalidateEmailAddressForm, |
||
53 | SetPasswordForm, |
||
54 | SuspendAccountForm, |
||
55 | ) |
||
56 | 1 | from . import service |
|
57 | |||
58 | |||
59 | 1 | blueprint = create_blueprint('user_admin', __name__) |
|
60 | |||
61 | |||
62 | 1 | @blueprint.get('/', defaults={'page': 1}) |
|
63 | 1 | @blueprint.get('/pages/<int:page>') |
|
64 | 1 | @permission_required('user.view') |
|
65 | 1 | @templated |
|
66 | def index(page): |
||
67 | """List users.""" |
||
68 | 1 | per_page = request.args.get('per_page', type=int, default=20) |
|
69 | 1 | search_term = request.args.get('search_term', default='').strip() |
|
70 | 1 | only = request.args.get('only') |
|
71 | |||
72 | 1 | user_state_filter = UserStateFilter.__members__.get( |
|
73 | only, UserStateFilter.none |
||
74 | ) |
||
75 | |||
76 | 1 | users = user_service.get_users_paginated( |
|
77 | page, per_page, search_term=search_term, state_filter=user_state_filter |
||
78 | ) |
||
79 | |||
80 | 1 | total_active = user_stats_service.count_active_users() |
|
81 | 1 | total_uninitialized = user_stats_service.count_uninitialized_users() |
|
82 | 1 | total_suspended = user_stats_service.count_suspended_users() |
|
83 | 1 | total_deleted = user_stats_service.count_deleted_users() |
|
84 | 1 | total_overall = user_stats_service.count_users() |
|
85 | |||
86 | 1 | return { |
|
87 | 'users': users, |
||
88 | 'total_active': total_active, |
||
89 | 'total_uninitialized': total_uninitialized, |
||
90 | 'total_suspended': total_suspended, |
||
91 | 'total_deleted': total_deleted, |
||
92 | 'total_overall': total_overall, |
||
93 | 'search_term': search_term, |
||
94 | 'only': only, |
||
95 | 'UserStateFilter': UserStateFilter, |
||
96 | 'user_state_filter': user_state_filter, |
||
97 | } |
||
98 | |||
99 | |||
100 | 1 | @blueprint.get('/<uuid:user_id>') |
|
101 | 1 | @permission_required('user.view') |
|
102 | 1 | @templated |
|
103 | def view(user_id): |
||
104 | """Show a user's interal profile.""" |
||
105 | 1 | user = _get_user_for_admin_or_404(user_id) |
|
106 | 1 | db_user = user_service.find_user_with_details(user.id) |
|
107 | |||
108 | 1 | recent_login = session_service.find_recent_login(user.id) |
|
109 | 1 | days_since_recent_login = _calculate_days_since(recent_login) |
|
110 | |||
111 | 1 | orga_activities = orga_team_service.get_orga_activities_for_user(user.id) |
|
112 | |||
113 | 1 | newsletter_subscription_states = list( |
|
114 | service.get_newsletter_subscription_states(user.id) |
||
115 | ) |
||
116 | 1 | newsletter_subscription_count = sum( |
|
117 | 1 for _, subscribed in newsletter_subscription_states if subscribed |
||
118 | ) |
||
119 | |||
120 | 1 | orders = order_service.get_orders_placed_by_user(user.id) |
|
121 | |||
122 | 1 | order_shop_ids = {order.shop_id for order in orders} |
|
123 | 1 | shops = shop_service.find_shops(order_shop_ids) |
|
124 | 1 | shops_by_id = {shop.id: shop for shop in shops} |
|
125 | |||
126 | 1 | parties_and_tickets = service.get_parties_and_tickets(user.id) |
|
127 | 1 | ticket_count = sum(len(tickets) for _, tickets in parties_and_tickets) |
|
128 | |||
129 | 1 | attended_parties = service.get_attended_parties(user.id) |
|
130 | |||
131 | 1 | badges_with_awarding_quantity = ( |
|
132 | badge_awarding_service.get_badges_awarded_to_user(user.id) |
||
133 | ) |
||
134 | 1 | badge_count = len(badges_with_awarding_quantity) |
|
135 | |||
136 | 1 | return { |
|
137 | 'profile_user': user, |
||
138 | 'user': db_user, |
||
139 | 'recent_login': recent_login, |
||
140 | 'days_since_recent_login': days_since_recent_login, |
||
141 | 'orga_activities': orga_activities, |
||
142 | 'newsletter_subscription_count': newsletter_subscription_count, |
||
143 | 'newsletter_subscription_states': newsletter_subscription_states, |
||
144 | 'orders': orders, |
||
145 | 'shops_by_id': shops_by_id, |
||
146 | 'parties_and_tickets': parties_and_tickets, |
||
147 | 'ticket_count': ticket_count, |
||
148 | 'attended_parties': attended_parties, |
||
149 | 'badge_count': badge_count, |
||
150 | 'badges_with_awarding_quantity': badges_with_awarding_quantity, |
||
151 | } |
||
152 | |||
153 | |||
154 | 1 | def _calculate_days_since(dt: Optional[datetime]) -> Optional[int]: |
|
155 | 1 | if dt is None: |
|
156 | 1 | return None |
|
157 | |||
158 | return (datetime.utcnow().date() - dt.date()).days |
||
159 | |||
160 | |||
161 | # -------------------------------------------------------------------- # |
||
162 | # account |
||
163 | |||
164 | |||
165 | 1 | @blueprint.get('/create') |
|
166 | 1 | @permission_required('user.create') |
|
167 | 1 | @templated |
|
168 | 1 | def create_account_form(erroneous_form=None): |
|
169 | """Show a form to create a user account.""" |
||
170 | 1 | form = erroneous_form if erroneous_form else CreateAccountForm() |
|
171 | 1 | form.set_site_choices() |
|
172 | |||
173 | 1 | return {'form': form} |
|
174 | |||
175 | |||
176 | 1 | @blueprint.post('/') |
|
177 | 1 | @permission_required('user.create') |
|
178 | def create_account(): |
||
179 | """Create a user account.""" |
||
180 | form = CreateAccountForm(request.form) |
||
181 | form.set_site_choices() |
||
182 | |||
183 | if not form.validate(): |
||
184 | return create_account_form(form) |
||
185 | |||
186 | screen_name = form.screen_name.data.strip() |
||
187 | first_names = form.first_names.data.strip() |
||
188 | last_name = form.last_name.data.strip() |
||
189 | email_address = form.email_address.data.lower() |
||
190 | password = form.password.data |
||
191 | site_id_for_email = form.site_id.data |
||
192 | |||
193 | if site_id_for_email: |
||
194 | site_for_email = site_service.get_site(site_id_for_email) |
||
195 | else: |
||
196 | site_for_email = None |
||
197 | |||
198 | initiator_id = g.user.id |
||
199 | |||
200 | try: |
||
201 | user, event = user_creation_service.create_user( |
||
202 | screen_name, |
||
203 | email_address, |
||
204 | password, |
||
205 | first_names=first_names, |
||
206 | last_name=last_name, |
||
207 | creator_id=initiator_id, |
||
208 | # Do not pass site ID here; the account is not created on a site. |
||
209 | ) |
||
210 | except user_creation_service.UserCreationFailed: |
||
211 | flash_error( |
||
212 | gettext( |
||
213 | 'User "%(screen_name)s" could not be created.', |
||
214 | screen_name=screen_name, |
||
215 | ) |
||
216 | ) |
||
217 | return create_account_form(form) |
||
218 | |||
219 | flash_success( |
||
220 | gettext( |
||
221 | 'User "%(screen_name)s" has been created.', |
||
222 | screen_name=user.screen_name, |
||
223 | ) |
||
224 | ) |
||
225 | |||
226 | if site_for_email: |
||
227 | user_creation_service.request_email_address_confirmation( |
||
228 | user, email_address, site_for_email.id |
||
229 | ) |
||
230 | flash_success( |
||
231 | gettext('An email has been sent to the corresponding address.'), |
||
232 | icon='email', |
||
233 | ) |
||
234 | |||
235 | user_signals.account_created.send(None, event=event) |
||
236 | |||
237 | return redirect_to('.view', user_id=user.id) |
||
238 | |||
239 | |||
240 | 1 | @blueprint.post('/<uuid:user_id>/initialize') |
|
241 | 1 | @permission_required('user.administrate') |
|
242 | 1 | @respond_no_content |
|
243 | def initialize_account(user_id): |
||
244 | """Initialize the user account.""" |
||
245 | user = _get_user_or_404(user_id) |
||
246 | |||
247 | initiator_id = g.user.id |
||
248 | |||
249 | user_command_service.initialize_account(user.id, initiator_id=initiator_id) |
||
250 | |||
251 | flash_success( |
||
252 | gettext( |
||
253 | "User '%(screen_name)s' has been initialized.", |
||
254 | screen_name=user.screen_name, |
||
255 | ) |
||
256 | ) |
||
257 | |||
258 | |||
259 | 1 | View Code Duplication | @blueprint.get('/<uuid:user_id>/suspend') |
260 | 1 | @permission_required('user.administrate') |
|
261 | 1 | @templated |
|
262 | 1 | def suspend_account_form(user_id, erroneous_form=None): |
|
263 | """Show form to suspend the user account.""" |
||
264 | 1 | user = _get_user_for_admin_or_404(user_id) |
|
265 | |||
266 | 1 | if user.suspended: |
|
267 | 1 | flash_error( |
|
268 | gettext( |
||
269 | "User '%(screen_name)s' is already suspended.", |
||
270 | screen_name=user.screen_name, |
||
271 | ) |
||
272 | ) |
||
273 | 1 | return redirect_to('.view', user_id=user.id) |
|
274 | |||
275 | 1 | form = erroneous_form if erroneous_form else SuspendAccountForm() |
|
276 | |||
277 | 1 | return { |
|
278 | 'profile_user': user, |
||
279 | 'user': user, |
||
280 | 'form': form, |
||
281 | } |
||
282 | |||
283 | |||
284 | 1 | View Code Duplication | @blueprint.post('/<uuid:user_id>/suspend') |
285 | 1 | @permission_required('user.administrate') |
|
286 | def suspend_account(user_id): |
||
287 | """Suspend the user account.""" |
||
288 | user = _get_user_or_404(user_id) |
||
289 | |||
290 | if user.suspended: |
||
291 | flash_error( |
||
292 | gettext( |
||
293 | "User '%(screen_name)s' is already suspended.", |
||
294 | screen_name=user.screen_name, |
||
295 | ) |
||
296 | ) |
||
297 | return redirect_to('.view', user_id=user.id) |
||
298 | |||
299 | form = SuspendAccountForm(request.form) |
||
300 | if not form.validate(): |
||
301 | return suspend_account_form(user.id, form) |
||
302 | |||
303 | initiator_id = g.user.id |
||
304 | reason = form.reason.data.strip() |
||
305 | |||
306 | event = user_command_service.suspend_account(user.id, initiator_id, reason) |
||
307 | |||
308 | user_signals.account_suspended.send(None, event=event) |
||
309 | |||
310 | flash_success( |
||
311 | gettext( |
||
312 | "User '%(screen_name)s' has been suspended.", |
||
313 | screen_name=user.screen_name, |
||
314 | ) |
||
315 | ) |
||
316 | |||
317 | return redirect_to('.view', user_id=user.id) |
||
318 | |||
319 | |||
320 | 1 | View Code Duplication | @blueprint.get('/<uuid:user_id>/unsuspend') |
321 | 1 | @permission_required('user.administrate') |
|
322 | 1 | @templated |
|
323 | 1 | def unsuspend_account_form(user_id, erroneous_form=None): |
|
324 | """Show form to unsuspend the user account.""" |
||
325 | 1 | user = _get_user_for_admin_or_404(user_id) |
|
326 | |||
327 | 1 | if not user.suspended: |
|
328 | 1 | flash_error( |
|
329 | gettext( |
||
330 | "User '%(screen_name)s' is not suspended.", |
||
331 | screen_name=user.screen_name, |
||
332 | ) |
||
333 | ) |
||
334 | 1 | return redirect_to('.view', user_id=user.id) |
|
335 | |||
336 | 1 | form = erroneous_form if erroneous_form else SuspendAccountForm() |
|
337 | |||
338 | 1 | return { |
|
339 | 'profile_user': user, |
||
340 | 'user': user, |
||
341 | 'form': form, |
||
342 | } |
||
343 | |||
344 | |||
345 | 1 | View Code Duplication | @blueprint.post('/<uuid:user_id>/unsuspend') |
346 | 1 | @permission_required('user.administrate') |
|
347 | def unsuspend_account(user_id): |
||
348 | """Unsuspend the user account.""" |
||
349 | user = _get_user_or_404(user_id) |
||
350 | |||
351 | if not user.suspended: |
||
352 | flash_error( |
||
353 | gettext( |
||
354 | "User '%(screen_name)s' is not suspended.", |
||
355 | screen_name=user.screen_name, |
||
356 | ) |
||
357 | ) |
||
358 | return redirect_to('.view', user_id=user.id) |
||
359 | |||
360 | form = SuspendAccountForm(request.form) |
||
361 | if not form.validate(): |
||
362 | return unsuspend_account_form(user.id, form) |
||
363 | |||
364 | initiator_id = g.user.id |
||
365 | reason = form.reason.data.strip() |
||
366 | |||
367 | event = user_command_service.unsuspend_account( |
||
368 | user.id, initiator_id, reason |
||
369 | ) |
||
370 | |||
371 | user_signals.account_unsuspended.send(None, event=event) |
||
372 | |||
373 | flash_success( |
||
374 | gettext( |
||
375 | "User '%(screen_name)s' has been unsuspended.", |
||
376 | screen_name=user.screen_name, |
||
377 | ) |
||
378 | ) |
||
379 | |||
380 | return redirect_to('.view', user_id=user.id) |
||
381 | |||
382 | |||
383 | 1 | @blueprint.get('/<uuid:user_id>/delete') |
|
384 | 1 | @permission_required('user.administrate') |
|
385 | 1 | @templated |
|
386 | 1 | def delete_account_form(user_id, erroneous_form=None): |
|
387 | """Show form to delete the user account.""" |
||
388 | 1 | user = _get_user_for_admin_or_404(user_id) |
|
389 | |||
390 | 1 | if user.deleted: |
|
391 | 1 | flash_error( |
|
392 | gettext( |
||
393 | "User '%(screen_name)s' has already been deleted.", |
||
394 | screen_name=user.screen_name, |
||
395 | ) |
||
396 | ) |
||
397 | 1 | return redirect_to('.view', user_id=user.id) |
|
398 | |||
399 | 1 | form = erroneous_form if erroneous_form else DeleteAccountForm() |
|
400 | |||
401 | 1 | return { |
|
402 | 'profile_user': user, |
||
403 | 'user': user, |
||
404 | 'form': form, |
||
405 | } |
||
406 | |||
407 | |||
408 | 1 | @blueprint.post('/<uuid:user_id>/delete') |
|
409 | 1 | @permission_required('user.administrate') |
|
410 | def delete_account(user_id): |
||
411 | """Delete the user account.""" |
||
412 | user = _get_user_or_404(user_id) |
||
413 | |||
414 | if user.deleted: |
||
415 | flash_error( |
||
416 | gettext( |
||
417 | "User '%(screen_name)s' has already been deleted.", |
||
418 | screen_name=user.screen_name, |
||
419 | ) |
||
420 | ) |
||
421 | return redirect_to('.view', user_id=user.id) |
||
422 | |||
423 | form = DeleteAccountForm(request.form) |
||
424 | if not form.validate(): |
||
425 | return delete_account_form(user.id, form) |
||
426 | |||
427 | initiator_id = g.user.id |
||
428 | reason = form.reason.data.strip() |
||
429 | |||
430 | event = user_deletion_service.delete_account(user.id, initiator_id, reason) |
||
431 | |||
432 | user_signals.account_deleted.send(None, event=event) |
||
433 | |||
434 | flash_success( |
||
435 | gettext( |
||
436 | "User '%(screen_name)s' has been deleted.", |
||
437 | screen_name=user.screen_name, |
||
438 | ) |
||
439 | ) |
||
440 | |||
441 | return redirect_to('.view', user_id=user.id) |
||
442 | |||
443 | |||
444 | # -------------------------------------------------------------------- # |
||
445 | # screen name |
||
446 | |||
447 | |||
448 | 1 | @blueprint.get('/<uuid:user_id>/change_screen_name') |
|
449 | 1 | @permission_required('user.administrate') |
|
450 | 1 | @templated |
|
451 | 1 | def change_screen_name_form(user_id, erroneous_form=None): |
|
452 | """Show form to change the user's screen name.""" |
||
453 | 1 | user = _get_user_for_admin_or_404(user_id) |
|
454 | |||
455 | 1 | form = erroneous_form if erroneous_form else ChangeScreenNameForm() |
|
456 | |||
457 | 1 | return { |
|
458 | 'profile_user': user, |
||
459 | 'user': user, |
||
460 | 'form': form, |
||
461 | } |
||
462 | |||
463 | |||
464 | 1 | @blueprint.post('/<uuid:user_id>/change_screen_name') |
|
465 | 1 | @permission_required('user.administrate') |
|
466 | def change_screen_name(user_id): |
||
467 | """Change the user's screen name.""" |
||
468 | user = _get_user_or_404(user_id) |
||
469 | |||
470 | form = ChangeScreenNameForm(request.form) |
||
471 | if not form.validate(): |
||
472 | return change_screen_name_form(user.id, form) |
||
473 | |||
474 | old_screen_name = user.screen_name |
||
475 | new_screen_name = form.screen_name.data.strip() |
||
476 | initiator_id = g.user.id |
||
477 | reason = form.reason.data.strip() |
||
478 | |||
479 | event = user_command_service.change_screen_name( |
||
480 | user.id, new_screen_name, initiator_id, reason=reason |
||
481 | ) |
||
482 | |||
483 | user_signals.screen_name_changed.send(None, event=event) |
||
484 | |||
485 | flash_success( |
||
486 | gettext( |
||
487 | "User '%(old_screen_name)s' has been renamed to '%(new_screen_name)s'.", |
||
488 | old_screen_name=old_screen_name, |
||
489 | new_screen_name=new_screen_name, |
||
490 | ) |
||
491 | ) |
||
492 | |||
493 | return redirect_to('.view', user_id=user.id) |
||
494 | |||
495 | |||
496 | # -------------------------------------------------------------------- # |
||
497 | # email address |
||
498 | |||
499 | |||
500 | 1 | @blueprint.get('/<uuid:user_id>/change_email_address') |
|
501 | 1 | @permission_required('user.administrate') |
|
502 | 1 | @templated |
|
503 | 1 | def change_email_address_form(user_id, erroneous_form=None): |
|
504 | """Show form to change the user's e-mail address.""" |
||
505 | 1 | user = _get_user_for_admin_or_404(user_id) |
|
506 | |||
507 | 1 | form = erroneous_form if erroneous_form else ChangeEmailAddressForm() |
|
508 | |||
509 | 1 | return { |
|
510 | 'profile_user': user, |
||
511 | 'user': user, |
||
512 | 'form': form, |
||
513 | } |
||
514 | |||
515 | |||
516 | 1 | @blueprint.post('/<uuid:user_id>/change_email_address') |
|
517 | 1 | @permission_required('user.administrate') |
|
518 | def change_email_address(user_id): |
||
519 | """Change the user's e-mail address.""" |
||
520 | user = _get_user_or_404(user_id) |
||
521 | |||
522 | form = ChangeEmailAddressForm(request.form) |
||
523 | if not form.validate(): |
||
524 | return change_email_address_form(user.id, form) |
||
525 | |||
526 | old_email_address = user_service.find_email_address(user.id) |
||
527 | new_email_address = form.email_address.data.strip() |
||
528 | verified = False |
||
529 | initiator_id = g.user.id |
||
530 | reason = form.reason.data.strip() |
||
531 | |||
532 | event = user_command_service.change_email_address( |
||
533 | user.id, new_email_address, verified, initiator_id, reason=reason |
||
534 | ) |
||
535 | |||
536 | user_signals.email_address_changed.send(None, event=event) |
||
537 | |||
538 | flash_success( |
||
539 | gettext( |
||
540 | "Email address for user '%(screen_name)s' has been updated.", |
||
541 | screen_name=user.screen_name, |
||
542 | ) |
||
543 | ) |
||
544 | |||
545 | return redirect_to('.view', user_id=user.id) |
||
546 | |||
547 | |||
548 | 1 | @blueprint.get('/<uuid:user_id>/invalidate_email_address') |
|
549 | 1 | @permission_required('user.administrate') |
|
550 | 1 | @templated |
|
551 | 1 | def invalidate_email_address_form(user_id, erroneous_form=None): |
|
552 | """Show form to invalidate the email address assigned with the account.""" |
||
553 | user = _get_user_for_admin_or_404(user_id) |
||
554 | |||
555 | email_address = user_service.get_email_address_data(user_id) |
||
556 | if not email_address.verified: |
||
557 | flash_error(gettext('Email address is already invalidated.')) |
||
558 | return redirect_to('.view', user_id=user.id) |
||
559 | |||
560 | form = erroneous_form if erroneous_form else InvalidateEmailAddressForm() |
||
561 | |||
562 | return { |
||
563 | 'profile_user': user, |
||
564 | 'user': user, |
||
565 | 'form': form, |
||
566 | } |
||
567 | |||
568 | |||
569 | 1 | @blueprint.post('/<uuid:user_id>/invalidate_email_address') |
|
570 | 1 | @permission_required('user.administrate') |
|
571 | def invalidate_email_address(user_id): |
||
572 | """Invalidate the email address assigned with the account.""" |
||
573 | user = _get_user_or_404(user_id) |
||
574 | |||
575 | email_address = user_service.get_email_address_data(user_id) |
||
576 | if not email_address.verified: |
||
577 | flash_error(gettext('Email address is already invalidated.')) |
||
578 | return redirect_to('.view', user_id=user.id) |
||
579 | |||
580 | form = InvalidateEmailAddressForm(request.form) |
||
581 | if not form.validate(): |
||
582 | return invalidate_email_address_form(user.id, form) |
||
583 | |||
584 | initiator_id = g.user.id |
||
585 | reason = form.reason.data.strip() |
||
586 | |||
587 | event = email_address_verification_service.invalidate_email_address( |
||
588 | user.id, reason, initiator_id=initiator_id |
||
589 | ) |
||
590 | |||
591 | user_signals.email_address_invalidated.send(None, event=event) |
||
592 | |||
593 | flash_success( |
||
594 | gettext( |
||
595 | "The email address of user '%(screen_name)s' has been invalidated.", |
||
596 | screen_name=user.screen_name, |
||
597 | ) |
||
598 | ) |
||
599 | |||
600 | return redirect_to('.view', user_id=user.id) |
||
601 | |||
602 | |||
603 | # -------------------------------------------------------------------- # |
||
604 | # details |
||
605 | |||
606 | |||
607 | 1 | @blueprint.get('/<uuid:user_id>/details') |
|
608 | 1 | @permission_required('user.administrate') |
|
609 | 1 | @templated |
|
610 | 1 | def change_details_form(user_id, erroneous_form=None): |
|
611 | """Show a form to change the user's details.""" |
||
612 | user = _get_user_for_admin_or_404(user_id) |
||
613 | |||
614 | detail = user_service.get_detail(user_id) |
||
615 | |||
616 | form = erroneous_form if erroneous_form else ChangeDetailsForm(obj=detail) |
||
617 | country_names = country_service.get_country_names() |
||
618 | |||
619 | return { |
||
620 | 'profile_user': user, |
||
621 | 'user': user, |
||
622 | 'form': form, |
||
623 | 'country_names': country_names, |
||
624 | } |
||
625 | |||
626 | |||
627 | 1 | @blueprint.post('/<uuid:user_id>/details') |
|
628 | 1 | @permission_required('user.administrate') |
|
629 | def change_details(user_id): |
||
630 | """Change the user's details.""" |
||
631 | user = _get_user_or_404(user_id) |
||
632 | |||
633 | form = ChangeDetailsForm(request.form) |
||
634 | if not form.validate(): |
||
635 | return change_details_form(user.id, form) |
||
636 | |||
637 | first_names = form.first_names.data.strip() |
||
638 | last_name = form.last_name.data.strip() |
||
639 | date_of_birth = form.date_of_birth.data |
||
640 | country = form.country.data.strip() |
||
641 | zip_code = form.zip_code.data.strip() |
||
642 | city = form.city.data.strip() |
||
643 | street = form.street.data.strip() |
||
644 | phone_number = form.phone_number.data.strip() |
||
645 | |||
646 | event = user_command_service.update_user_details( |
||
647 | user.id, |
||
648 | first_names, |
||
649 | last_name, |
||
650 | date_of_birth, |
||
651 | country, |
||
652 | zip_code, |
||
653 | city, |
||
654 | street, |
||
655 | phone_number, |
||
656 | g.user.id, # initiator_id |
||
657 | ) |
||
658 | |||
659 | flash_success(gettext('Changes have been saved.')) |
||
660 | |||
661 | user_signals.details_updated.send(None, event=event) |
||
662 | |||
663 | return redirect_to('.view', user_id=user.id) |
||
664 | |||
665 | |||
666 | # -------------------------------------------------------------------- # |
||
667 | # authentication |
||
668 | |||
669 | |||
670 | 1 | @blueprint.get('/<uuid:user_id>/password') |
|
671 | 1 | @permission_required('user.set_password') |
|
672 | 1 | @templated |
|
673 | 1 | def set_password_form(user_id, erroneous_form=None): |
|
674 | """Show a form to set a new password for the user.""" |
||
675 | 1 | user = _get_user_for_admin_or_404(user_id) |
|
676 | |||
677 | 1 | form = erroneous_form if erroneous_form else SetPasswordForm() |
|
678 | |||
679 | 1 | return { |
|
680 | 'profile_user': user, |
||
681 | 'user': user, |
||
682 | 'form': form, |
||
683 | } |
||
684 | |||
685 | |||
686 | 1 | @blueprint.post('/<uuid:user_id>/password') |
|
687 | 1 | @permission_required('user.set_password') |
|
688 | def set_password(user_id): |
||
689 | """Set a new password for the user.""" |
||
690 | user = _get_user_or_404(user_id) |
||
691 | |||
692 | form = SetPasswordForm(request.form) |
||
693 | if not form.validate(): |
||
694 | return set_password_form(user.id, form) |
||
695 | |||
696 | new_password = form.password.data |
||
697 | initiator_id = g.user.id |
||
698 | |||
699 | password_service.update_password_hash(user.id, new_password, initiator_id) |
||
700 | |||
701 | flash_success( |
||
702 | gettext( |
||
703 | "New password has been set for user '%(screen_name)s'.", |
||
704 | screen_name=user.screen_name, |
||
705 | ) |
||
706 | ) |
||
707 | |||
708 | return redirect_to('.view', user_id=user.id) |
||
709 | |||
710 | |||
711 | # -------------------------------------------------------------------- # |
||
712 | # authorization |
||
713 | |||
714 | |||
715 | 1 | @blueprint.get('/<uuid:user_id>/permissions') |
|
716 | 1 | @permission_required('user.view') |
|
717 | 1 | @templated |
|
718 | def view_permissions(user_id): |
||
719 | """Show user's permissions.""" |
||
720 | 1 | user = _get_user_for_admin_or_404(user_id) |
|
721 | |||
722 | 1 | user_permission_ids_by_role = ( |
|
723 | authorization_service.get_permission_ids_by_role_for_user(user.id) |
||
724 | ) |
||
725 | |||
726 | 1 | permissions_by_role = _index_permissions_by_role( |
|
727 | user_permission_ids_by_role |
||
728 | ) |
||
729 | |||
730 | 1 | return { |
|
731 | 'profile_user': user, |
||
732 | 'user': user, |
||
733 | 'permissions_by_role': permissions_by_role, |
||
734 | } |
||
735 | |||
736 | |||
737 | 1 | @blueprint.get('/<uuid:user_id>/roles/assignment') |
|
738 | 1 | @permission_required('role.assign') |
|
739 | 1 | @templated |
|
740 | def manage_roles(user_id): |
||
741 | """Manage what roles are assigned to the user.""" |
||
742 | 1 | user = _get_user_for_admin_or_404(user_id) |
|
743 | |||
744 | 1 | permission_ids_by_role = authorization_service.get_permission_ids_by_role() |
|
745 | |||
746 | 1 | permissions_by_role = _index_permissions_by_role(permission_ids_by_role) |
|
747 | |||
748 | 1 | user_role_ids = authorization_service.find_role_ids_for_user(user.id) |
|
749 | |||
750 | 1 | return { |
|
751 | 'profile_user': user, |
||
752 | 'user': user, |
||
753 | 'permissions_by_role': permissions_by_role, |
||
754 | 'user_role_ids': user_role_ids, |
||
755 | } |
||
756 | |||
757 | |||
758 | 1 | def _index_permissions_by_role( |
|
759 | permission_ids_by_role: dict[Role, frozenset[PermissionID]] |
||
760 | ) -> dict[Role, frozenset[Permission]]: |
||
761 | 1 | registered_permissions_by_id = { |
|
762 | permission.id: permission |
||
763 | for permission in permission_registry.get_registered_permissions() |
||
764 | } |
||
765 | |||
766 | 1 | return { |
|
767 | role: frozenset( |
||
768 | registered_permissions_by_id[permission_id] |
||
769 | for permission_id in permission_ids |
||
770 | if permission_id in registered_permissions_by_id |
||
771 | ) |
||
772 | for role, permission_ids in permission_ids_by_role.items() |
||
773 | } |
||
774 | |||
775 | |||
776 | 1 | @blueprint.post('/<uuid:user_id>/roles/<role_id>') |
|
777 | 1 | @permission_required('role.assign') |
|
778 | 1 | @respond_no_content |
|
779 | def role_assign(user_id, role_id): |
||
780 | """Assign the role to the user.""" |
||
781 | user = _get_user_or_404(user_id) |
||
782 | role = _get_role_or_404(role_id) |
||
783 | initiator_id = g.user.id |
||
784 | |||
785 | authorization_service.assign_role_to_user( |
||
786 | role.id, user.id, initiator_id=initiator_id |
||
787 | ) |
||
788 | |||
789 | flash_success( |
||
790 | gettext( |
||
791 | '%(role_title)s has been assigned to "%(screen_name)s".', |
||
792 | screen_name=user.screen_name, |
||
793 | role_title=role.title, |
||
794 | ) |
||
795 | ) |
||
796 | |||
797 | |||
798 | 1 | @blueprint.delete('/<uuid:user_id>/roles/<role_id>') |
|
799 | 1 | @permission_required('role.assign') |
|
800 | 1 | @respond_no_content |
|
801 | def role_deassign(user_id, role_id): |
||
802 | """Deassign the role from the user.""" |
||
803 | user = _get_user_or_404(user_id) |
||
804 | role = _get_role_or_404(role_id) |
||
805 | initiator_id = g.user.id |
||
806 | |||
807 | authorization_service.deassign_role_from_user( |
||
808 | role.id, user.id, initiator_id=initiator_id |
||
809 | ) |
||
810 | |||
811 | flash_success( |
||
812 | gettext( |
||
813 | '%(role_title)s has been withdrawn from "%(screen_name)s".', |
||
814 | screen_name=user.screen_name, |
||
815 | role_title=role.title, |
||
816 | ) |
||
817 | ) |
||
818 | |||
819 | |||
820 | # -------------------------------------------------------------------- # |
||
821 | # events |
||
822 | |||
823 | |||
824 | 1 | @blueprint.get('/<uuid:user_id>/events') |
|
825 | 1 | @permission_required('user.view') |
|
826 | 1 | @templated |
|
827 | def view_events(user_id): |
||
828 | """Show user's events.""" |
||
829 | 1 | user = _get_user_for_admin_or_404(user_id) |
|
830 | |||
831 | 1 | events = list(service.get_events(user.id)) |
|
832 | |||
833 | 1 | include_logins = request.args.get('include_logins', default='yes') == 'yes' |
|
834 | 1 | if not include_logins: |
|
835 | events = [ |
||
836 | event for event in events if event['event'] != 'user-logged-in' |
||
837 | ] |
||
838 | |||
839 | 1 | return { |
|
840 | 'profile_user': user, |
||
841 | 'user': user, |
||
842 | 'events': events, |
||
843 | 'logins_included': include_logins, |
||
844 | } |
||
845 | |||
846 | |||
847 | # -------------------------------------------------------------------- # |
||
848 | # helpers |
||
849 | |||
850 | |||
851 | 1 | def _get_user_for_admin_or_404(user_id) -> UserForAdmin: |
|
852 | 1 | user = user_service.find_user_for_admin(user_id) |
|
853 | |||
854 | 1 | if user is None: |
|
855 | abort(404) |
||
856 | |||
857 | 1 | return user |
|
858 | |||
859 | |||
860 | 1 | def _get_user_or_404(user_id): |
|
861 | user = user_service.find_user(user_id) |
||
862 | |||
863 | if user is None: |
||
864 | abort(404) |
||
865 | |||
866 | return user |
||
867 | |||
868 | |||
869 | 1 | def _get_role_or_404(role_id): |
|
870 | role = authorization_service.find_role(role_id) |
||
871 | |||
872 | if role is None: |
||
873 | abort(404) |
||
874 | |||
875 | return role |
||
876 |