Total Complexity | 66 |
Total Lines | 889 |
Duplicated Lines | 12.6 % |
Coverage | 54.55% |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like byceps.blueprints.admin.user.views often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | """ |
||
2 | byceps.blueprints.admin.user.views |
||
3 | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
||
4 | |||
5 | :Copyright: 2014-2024 Jochen Kupperschmidt |
||
6 | :License: Revised BSD (see `LICENSE` file for details) |
||
7 | """ |
||
8 | |||
9 | 1 | from datetime import datetime |
|
10 | |||
11 | 1 | from flask import abort, g, request |
|
12 | from flask_babel import gettext |
||
13 | 1 | ||
14 | 1 | from byceps.services.authn.password import authn_password_service |
|
15 | from byceps.services.authn.session import authn_session_service |
||
16 | 1 | from byceps.services.authz import authz_service |
|
17 | 1 | from byceps.services.authz.models import Permission, PermissionID, Role |
|
18 | 1 | from byceps.services.country import country_service |
|
19 | 1 | from byceps.services.orga_team import orga_team_service |
|
20 | 1 | from byceps.services.shop.order import order_service |
|
21 | 1 | from byceps.services.shop.shop import shop_service |
|
22 | 1 | from byceps.services.site import site_service |
|
23 | 1 | from byceps.services.user import ( |
|
24 | 1 | user_command_service, |
|
25 | 1 | user_creation_service, |
|
26 | user_deletion_service, |
||
27 | user_email_address_service, |
||
28 | user_service, |
||
29 | ) |
||
30 | from byceps.services.user.models.user import UserFilter, UserForAdmin |
||
31 | from byceps.services.user_badge import user_badge_awarding_service |
||
32 | 1 | from byceps.signals import authn as authn_signals |
|
33 | 1 | from byceps.signals import authz as authz_signals |
|
34 | 1 | from byceps.signals import user as user_signals |
|
35 | 1 | from byceps.util.authz import permission_registry |
|
36 | 1 | from byceps.util.framework.blueprint import create_blueprint |
|
37 | 1 | from byceps.util.framework.flash import flash_error, flash_success |
|
38 | 1 | from byceps.util.framework.templating import templated |
|
39 | 1 | from byceps.util.views import ( |
|
40 | 1 | permission_required, |
|
41 | 1 | redirect_to, |
|
42 | respond_no_content, |
||
43 | ) |
||
44 | |||
45 | from . import service |
||
46 | from .forms import ( |
||
47 | 1 | ACCOUNT_DELETION_VERIFICATION_TEXT, |
|
48 | 1 | ChangeDetailsForm, |
|
49 | ChangeEmailAddressForm, |
||
50 | ChangeScreenNameForm, |
||
51 | CreateAccountForm, |
||
52 | DeleteAccountForm, |
||
53 | InvalidateEmailAddressForm, |
||
54 | SetPasswordForm, |
||
55 | SuspendAccountForm, |
||
56 | ) |
||
57 | |||
58 | |||
59 | blueprint = create_blueprint('user_admin', __name__) |
||
60 | |||
61 | 1 | ||
62 | @blueprint.get('/', defaults={'page': 1}) |
||
63 | @blueprint.get('/pages/<int:page>') |
||
64 | 1 | @permission_required('user.view') |
|
65 | 1 | @templated |
|
66 | 1 | def index(page): |
|
67 | 1 | """List users.""" |
|
68 | 1 | per_page = request.args.get('per_page', type=int, default=20) |
|
69 | search_term = request.args.get('search_term', default='').strip() |
||
70 | 1 | only = request.args.get('only') |
|
71 | 1 | ||
72 | 1 | user_filter = UserFilter.__members__.get(only, UserFilter.none) |
|
73 | |||
74 | 1 | users = user_service.get_users_paginated( |
|
75 | page, |
||
76 | per_page, |
||
77 | search_term=search_term, |
||
78 | 1 | user_filter=user_filter, |
|
79 | ) |
||
80 | |||
81 | user_ids = {user.id for user in users.items} |
||
82 | 1 | recent_login_datetimes_by_user_id = ( |
|
83 | authn_session_service.find_recent_logins_for_users(user_ids) |
||
84 | ) |
||
85 | 1 | ||
86 | users.items = [ |
||
87 | 1 | (user, recent_login_datetimes_by_user_id.get(user.id)) for user in users |
|
88 | ] |
||
89 | |||
90 | return { |
||
91 | 'users': users, |
||
92 | 'search_term': search_term, |
||
93 | 'only': only, |
||
94 | 'UserFilter': UserFilter, |
||
95 | 'user_filter': user_filter, |
||
96 | } |
||
97 | 1 | ||
98 | 1 | ||
99 | 1 | @blueprint.get('/<uuid:user_id>') |
|
100 | 1 | @permission_required('user.view') |
|
101 | @templated |
||
102 | 1 | def view(user_id): |
|
103 | 1 | """Show a user's internal profile.""" |
|
104 | user = _get_user_for_admin_or_404(user_id) |
||
105 | 1 | db_user = user_service.find_user_with_details(user.id) |
|
106 | 1 | ||
107 | recent_login = authn_session_service.find_recent_login(user.id) |
||
108 | 1 | days_since_recent_login = _calculate_days_since(recent_login) |
|
109 | |||
110 | 1 | orga_activities = orga_team_service.get_orga_activities_for_user(user.id) |
|
111 | |||
112 | newsletter_subscription_states = list( |
||
113 | 1 | service.get_newsletter_subscription_states(user.id) |
|
114 | ) |
||
115 | newsletter_subscription_count = sum( |
||
116 | 1 for _, subscribed in newsletter_subscription_states if subscribed |
||
117 | 1 | ) |
|
118 | |||
119 | 1 | orders = order_service.get_orders_placed_by_user(user.id) |
|
120 | 1 | ||
121 | 1 | order_shop_ids = {order.shop_id for order in orders} |
|
122 | shops = shop_service.find_shops(order_shop_ids) |
||
123 | 1 | shops_by_id = {shop.id: shop for shop in shops} |
|
124 | 1 | ||
125 | parties_and_tickets = service.get_parties_and_tickets(user.id) |
||
126 | 1 | ticket_count = sum(len(tickets) for _, tickets in parties_and_tickets) |
|
127 | |||
128 | 1 | attended_parties = service.get_attended_parties(user.id) |
|
129 | |||
130 | badges_with_awarding_quantity = ( |
||
131 | 1 | user_badge_awarding_service.get_badges_awarded_to_user(user.id) |
|
132 | ) |
||
133 | 1 | badge_count = len(badges_with_awarding_quantity) |
|
134 | |||
135 | return { |
||
136 | 'profile_user': user, |
||
137 | 'user': db_user, |
||
138 | 'recent_login': recent_login, |
||
139 | 'days_since_recent_login': days_since_recent_login, |
||
140 | 'orga_activities': orga_activities, |
||
141 | 'newsletter_subscription_count': newsletter_subscription_count, |
||
142 | 'newsletter_subscription_states': newsletter_subscription_states, |
||
143 | 'orders': orders, |
||
144 | 'shops_by_id': shops_by_id, |
||
145 | 'parties_and_tickets': parties_and_tickets, |
||
146 | 'ticket_count': ticket_count, |
||
147 | 'attended_parties': attended_parties, |
||
148 | 'badge_count': badge_count, |
||
149 | 'badges_with_awarding_quantity': badges_with_awarding_quantity, |
||
150 | } |
||
151 | 1 | ||
152 | 1 | ||
153 | 1 | def _calculate_days_since(dt: datetime | None) -> int | None: |
|
154 | if dt is None: |
||
155 | return None |
||
156 | |||
157 | return (datetime.utcnow().date() - dt.date()).days |
||
158 | |||
159 | |||
160 | # -------------------------------------------------------------------- # |
||
161 | # account |
||
162 | 1 | ||
163 | 1 | ||
164 | 1 | @blueprint.get('/create') |
|
165 | 1 | @permission_required('user.create') |
|
166 | @templated |
||
167 | 1 | def create_account_form(erroneous_form=None): |
|
168 | 1 | """Show a form to create a user account.""" |
|
169 | form = erroneous_form if erroneous_form else CreateAccountForm() |
||
170 | 1 | form.set_site_choices() |
|
171 | |||
172 | return {'form': form} |
||
173 | 1 | ||
174 | 1 | ||
175 | 1 | @blueprint.post('/') |
|
176 | @permission_required('user.create') |
||
177 | def create_account(): |
||
178 | """Create a user account.""" |
||
179 | form = CreateAccountForm(request.form) |
||
180 | form.set_site_choices() |
||
181 | |||
182 | if not form.validate(): |
||
183 | return create_account_form(form) |
||
184 | |||
185 | screen_name = form.screen_name.data.strip() |
||
186 | first_name = form.first_name.data.strip() |
||
187 | last_name = form.last_name.data.strip() |
||
188 | email_address = form.email_address.data.lower() |
||
189 | password = form.password.data |
||
190 | site_id_for_email = form.site_id.data |
||
191 | |||
192 | if site_id_for_email: |
||
193 | site_for_email = site_service.get_site(site_id_for_email) |
||
194 | else: |
||
195 | site_for_email = None |
||
196 | |||
197 | initiator = g.user |
||
198 | |||
199 | creation_result = user_creation_service.create_user( |
||
200 | screen_name, |
||
201 | email_address, |
||
202 | password, |
||
203 | first_name=first_name, |
||
204 | last_name=last_name, |
||
205 | creation_method='admin app', |
||
206 | creator=initiator, |
||
207 | # Do not pass site ID here; the account is not created on a site. |
||
208 | ip_address=request.remote_addr, |
||
209 | ) |
||
210 | if creation_result.is_err(): |
||
211 | flash_error( |
||
212 | gettext( |
||
213 | 'User "%(screen_name)s" could not be created.', |
||
214 | screen_name=screen_name, |
||
215 | ) |
||
216 | ) |
||
217 | return create_account_form(form) |
||
218 | |||
219 | user, event = creation_result.unwrap() |
||
220 | |||
221 | flash_success( |
||
222 | gettext( |
||
223 | 'User "%(screen_name)s" has been created.', |
||
224 | screen_name=user.screen_name, |
||
225 | ) |
||
226 | ) |
||
227 | |||
228 | if site_for_email: |
||
229 | user_creation_service.request_email_address_confirmation( |
||
230 | user, email_address, site_for_email.id |
||
231 | ).unwrap() |
||
232 | flash_success( |
||
233 | gettext('An email has been sent to the corresponding address.'), |
||
234 | icon='email', |
||
235 | ) |
||
236 | |||
237 | user_signals.account_created.send(None, event=event) |
||
238 | |||
239 | return redirect_to('.view', user_id=user.id) |
||
240 | 1 | ||
241 | 1 | ||
242 | 1 | @blueprint.post('/<uuid:user_id>/initialize') |
|
243 | 1 | @permission_required('user.administrate') |
|
244 | @respond_no_content |
||
245 | def initialize_account(user_id): |
||
246 | """Initialize the user account.""" |
||
247 | user = _get_user_or_404(user_id) |
||
248 | |||
249 | initiator = g.user |
||
250 | |||
251 | user_command_service.initialize_account(user, initiator=initiator) |
||
252 | |||
253 | flash_success( |
||
254 | gettext( |
||
255 | "User '%(screen_name)s' has been initialized.", |
||
256 | screen_name=user.screen_name, |
||
257 | ) |
||
258 | ) |
||
259 | 1 | ||
260 | 1 | ||
261 | 1 | View Code Duplication | @blueprint.get('/<uuid:user_id>/suspend') |
262 | 1 | @permission_required('user.administrate') |
|
263 | @templated |
||
264 | 1 | def suspend_account_form(user_id, erroneous_form=None): |
|
265 | """Show form to suspend the user account.""" |
||
266 | 1 | user = _get_user_for_admin_or_404(user_id) |
|
267 | 1 | ||
268 | if user.suspended: |
||
269 | flash_error( |
||
270 | gettext( |
||
271 | "User '%(screen_name)s' is already suspended.", |
||
272 | screen_name=user.screen_name, |
||
273 | 1 | ) |
|
274 | ) |
||
275 | 1 | return redirect_to('.view', user_id=user.id) |
|
276 | |||
277 | 1 | form = erroneous_form if erroneous_form else SuspendAccountForm() |
|
278 | |||
279 | return { |
||
280 | 'profile_user': user, |
||
281 | 'user': user, |
||
282 | 'form': form, |
||
283 | } |
||
284 | 1 | ||
285 | 1 | ||
286 | 1 | View Code Duplication | @blueprint.post('/<uuid:user_id>/suspend') |
287 | @permission_required('user.administrate') |
||
288 | def suspend_account(user_id): |
||
289 | """Suspend the user account.""" |
||
290 | user = _get_user_or_404(user_id) |
||
291 | |||
292 | if user.suspended: |
||
293 | flash_error( |
||
294 | gettext( |
||
295 | "User '%(screen_name)s' is already suspended.", |
||
296 | screen_name=user.screen_name, |
||
297 | ) |
||
298 | ) |
||
299 | return redirect_to('.view', user_id=user.id) |
||
300 | |||
301 | form = SuspendAccountForm(request.form) |
||
302 | if not form.validate(): |
||
303 | return suspend_account_form(user.id, form) |
||
304 | |||
305 | initiator = g.user |
||
306 | reason = form.reason.data.strip() |
||
307 | |||
308 | event = user_command_service.suspend_account(user, initiator, reason) |
||
309 | |||
310 | user_signals.account_suspended.send(None, event=event) |
||
311 | |||
312 | flash_success( |
||
313 | gettext( |
||
314 | "User '%(screen_name)s' has been suspended.", |
||
315 | screen_name=user.screen_name, |
||
316 | ) |
||
317 | ) |
||
318 | |||
319 | return redirect_to('.view', user_id=user.id) |
||
320 | 1 | ||
321 | 1 | ||
322 | 1 | View Code Duplication | @blueprint.get('/<uuid:user_id>/unsuspend') |
323 | 1 | @permission_required('user.administrate') |
|
324 | @templated |
||
325 | 1 | def unsuspend_account_form(user_id, erroneous_form=None): |
|
326 | """Show form to unsuspend the user account.""" |
||
327 | 1 | user = _get_user_for_admin_or_404(user_id) |
|
328 | 1 | ||
329 | if not user.suspended: |
||
330 | flash_error( |
||
331 | gettext( |
||
332 | "User '%(screen_name)s' is not suspended.", |
||
333 | screen_name=user.screen_name, |
||
334 | 1 | ) |
|
335 | ) |
||
336 | 1 | return redirect_to('.view', user_id=user.id) |
|
337 | |||
338 | 1 | form = erroneous_form if erroneous_form else SuspendAccountForm() |
|
339 | |||
340 | return { |
||
341 | 'profile_user': user, |
||
342 | 'user': user, |
||
343 | 'form': form, |
||
344 | } |
||
345 | 1 | ||
346 | 1 | ||
347 | 1 | View Code Duplication | @blueprint.post('/<uuid:user_id>/unsuspend') |
348 | @permission_required('user.administrate') |
||
349 | def unsuspend_account(user_id): |
||
350 | """Unsuspend the user account.""" |
||
351 | user = _get_user_or_404(user_id) |
||
352 | |||
353 | if not user.suspended: |
||
354 | flash_error( |
||
355 | gettext( |
||
356 | "User '%(screen_name)s' is not suspended.", |
||
357 | screen_name=user.screen_name, |
||
358 | ) |
||
359 | ) |
||
360 | return redirect_to('.view', user_id=user.id) |
||
361 | |||
362 | form = SuspendAccountForm(request.form) |
||
363 | if not form.validate(): |
||
364 | return unsuspend_account_form(user.id, form) |
||
365 | |||
366 | initiator = g.user |
||
367 | reason = form.reason.data.strip() |
||
368 | |||
369 | event = user_command_service.unsuspend_account(user, initiator, reason) |
||
370 | |||
371 | user_signals.account_unsuspended.send(None, event=event) |
||
372 | |||
373 | flash_success( |
||
374 | gettext( |
||
375 | "User '%(screen_name)s' has been unsuspended.", |
||
376 | screen_name=user.screen_name, |
||
377 | ) |
||
378 | ) |
||
379 | |||
380 | return redirect_to('.view', user_id=user.id) |
||
381 | 1 | ||
382 | 1 | ||
383 | 1 | @blueprint.get('/<uuid:user_id>/delete') |
|
384 | 1 | @permission_required('user.administrate') |
|
385 | @templated |
||
386 | 1 | def delete_account_form(user_id, erroneous_form=None): |
|
387 | """Show form to delete the user account.""" |
||
388 | 1 | user = _get_user_for_admin_or_404(user_id) |
|
389 | 1 | ||
390 | if user.deleted: |
||
391 | flash_error( |
||
392 | gettext( |
||
393 | "User '%(screen_name)s' has already been deleted.", |
||
394 | screen_name=user.screen_name, |
||
395 | 1 | ) |
|
396 | ) |
||
397 | 1 | return redirect_to('.view', user_id=user.id) |
|
398 | |||
399 | 1 | form = erroneous_form if erroneous_form else DeleteAccountForm() |
|
400 | |||
401 | return { |
||
402 | 'profile_user': user, |
||
403 | 'user': user, |
||
404 | 'form': form, |
||
405 | 'verification_text': ACCOUNT_DELETION_VERIFICATION_TEXT, |
||
406 | } |
||
407 | 1 | ||
408 | 1 | ||
409 | 1 | @blueprint.post('/<uuid:user_id>/delete') |
|
410 | @permission_required('user.administrate') |
||
411 | def delete_account(user_id): |
||
412 | """Delete the user account.""" |
||
413 | user = _get_user_or_404(user_id) |
||
414 | |||
415 | if user.deleted: |
||
416 | flash_error( |
||
417 | gettext( |
||
418 | "User '%(screen_name)s' has already been deleted.", |
||
419 | screen_name=user.screen_name, |
||
420 | ) |
||
421 | ) |
||
422 | return redirect_to('.view', user_id=user.id) |
||
423 | |||
424 | form = DeleteAccountForm(request.form) |
||
425 | if not form.validate(): |
||
426 | return delete_account_form(user.id, form) |
||
427 | |||
428 | initiator = g.user |
||
429 | reason = form.reason.data.strip() |
||
430 | |||
431 | event = user_deletion_service.delete_account(user, initiator, reason) |
||
432 | |||
433 | user_signals.account_deleted.send(None, event=event) |
||
434 | |||
435 | flash_success( |
||
436 | gettext( |
||
437 | "User '%(screen_name)s' has been deleted.", |
||
438 | screen_name=user.screen_name, |
||
439 | ) |
||
440 | ) |
||
441 | |||
442 | return redirect_to('.view', user_id=user.id) |
||
443 | |||
444 | |||
445 | # -------------------------------------------------------------------- # |
||
446 | # screen name |
||
447 | 1 | ||
448 | 1 | ||
449 | 1 | @blueprint.get('/<uuid:user_id>/change_screen_name') |
|
450 | 1 | @permission_required('user.administrate') |
|
451 | @templated |
||
452 | 1 | def change_screen_name_form(user_id, erroneous_form=None): |
|
453 | """Show form to change the user's screen name.""" |
||
454 | 1 | user = _get_user_for_admin_or_404(user_id) |
|
455 | |||
456 | 1 | form = erroneous_form if erroneous_form else ChangeScreenNameForm() |
|
457 | |||
458 | return { |
||
459 | 'profile_user': user, |
||
460 | 'user': user, |
||
461 | 'form': form, |
||
462 | } |
||
463 | 1 | ||
464 | 1 | ||
465 | 1 | @blueprint.post('/<uuid:user_id>/change_screen_name') |
|
466 | @permission_required('user.administrate') |
||
467 | def change_screen_name(user_id): |
||
468 | """Change the user's screen name.""" |
||
469 | user = _get_user_or_404(user_id) |
||
470 | |||
471 | form = ChangeScreenNameForm(request.form) |
||
472 | if not form.validate(): |
||
473 | return change_screen_name_form(user.id, form) |
||
474 | |||
475 | old_screen_name = user.screen_name |
||
476 | new_screen_name = form.screen_name.data.strip() |
||
477 | initiator = g.user |
||
478 | reason = form.reason.data.strip() |
||
479 | |||
480 | event = user_command_service.change_screen_name( |
||
481 | user, new_screen_name, initiator, reason=reason |
||
482 | ) |
||
483 | |||
484 | user_signals.screen_name_changed.send(None, event=event) |
||
485 | |||
486 | flash_success( |
||
487 | gettext( |
||
488 | "User '%(old_screen_name)s' has been renamed to '%(new_screen_name)s'.", |
||
489 | old_screen_name=old_screen_name, |
||
490 | new_screen_name=new_screen_name, |
||
491 | ) |
||
492 | ) |
||
493 | |||
494 | return redirect_to('.view', user_id=user.id) |
||
495 | |||
496 | |||
497 | # -------------------------------------------------------------------- # |
||
498 | # email address |
||
499 | 1 | ||
500 | 1 | ||
501 | 1 | @blueprint.get('/<uuid:user_id>/change_email_address') |
|
502 | 1 | @permission_required('user.administrate') |
|
503 | @templated |
||
504 | 1 | def change_email_address_form(user_id, erroneous_form=None): |
|
505 | """Show form to change the user's e-mail address.""" |
||
506 | 1 | user = _get_user_for_admin_or_404(user_id) |
|
507 | |||
508 | 1 | form = erroneous_form if erroneous_form else ChangeEmailAddressForm() |
|
509 | |||
510 | return { |
||
511 | 'profile_user': user, |
||
512 | 'user': user, |
||
513 | 'form': form, |
||
514 | } |
||
515 | 1 | ||
516 | 1 | ||
517 | 1 | @blueprint.post('/<uuid:user_id>/change_email_address') |
|
518 | @permission_required('user.administrate') |
||
519 | def change_email_address(user_id): |
||
520 | """Change the user's e-mail address.""" |
||
521 | user = _get_user_or_404(user_id) |
||
522 | |||
523 | form = ChangeEmailAddressForm(request.form) |
||
524 | if not form.validate(): |
||
525 | return change_email_address_form(user.id, form) |
||
526 | |||
527 | new_email_address = form.email_address.data.strip() |
||
528 | verified = False |
||
529 | initiator = g.user |
||
530 | reason = form.reason.data.strip() |
||
531 | |||
532 | event = user_command_service.change_email_address( |
||
533 | user, new_email_address, verified, initiator, reason=reason |
||
534 | ) |
||
535 | |||
536 | user_signals.email_address_changed.send(None, event=event) |
||
537 | |||
538 | flash_success( |
||
539 | gettext( |
||
540 | "Email address for user '%(screen_name)s' has been updated.", |
||
541 | screen_name=user.screen_name, |
||
542 | ) |
||
543 | ) |
||
544 | |||
545 | return redirect_to('.view', user_id=user.id) |
||
546 | 1 | ||
547 | 1 | ||
548 | 1 | @blueprint.get('/<uuid:user_id>/invalidate_email_address') |
|
549 | 1 | @permission_required('user.administrate') |
|
550 | @templated |
||
551 | def invalidate_email_address_form(user_id, erroneous_form=None): |
||
552 | """Show form to invalidate the email address assigned with the account.""" |
||
553 | user = _get_user_for_admin_or_404(user_id) |
||
554 | |||
555 | email_address = user_service.get_email_address_data(user_id) |
||
556 | if not email_address.verified: |
||
557 | flash_error(gettext('Email address is already invalidated.')) |
||
558 | return redirect_to('.view', user_id=user.id) |
||
559 | |||
560 | form = erroneous_form if erroneous_form else InvalidateEmailAddressForm() |
||
561 | |||
562 | return { |
||
563 | 'profile_user': user, |
||
564 | 'user': user, |
||
565 | 'form': form, |
||
566 | } |
||
567 | 1 | ||
568 | 1 | ||
569 | 1 | @blueprint.post('/<uuid:user_id>/invalidate_email_address') |
|
570 | @permission_required('user.administrate') |
||
571 | def invalidate_email_address(user_id): |
||
572 | """Invalidate the email address assigned with the account.""" |
||
573 | user = _get_user_or_404(user_id) |
||
574 | |||
575 | form = InvalidateEmailAddressForm(request.form) |
||
576 | if not form.validate(): |
||
577 | return invalidate_email_address_form(user.id, form) |
||
578 | |||
579 | initiator = g.user |
||
580 | reason = form.reason.data.strip() |
||
581 | |||
582 | invalidation_result = user_email_address_service.invalidate_email_address( |
||
583 | user, reason, initiator=initiator |
||
584 | ) |
||
585 | |||
586 | if invalidation_result.is_err(): |
||
587 | flash_error(invalidation_result.unwrap_err()) |
||
588 | return redirect_to('.view', user_id=user.id) |
||
589 | |||
590 | event = invalidation_result.unwrap() |
||
591 | |||
592 | user_signals.email_address_invalidated.send(None, event=event) |
||
593 | |||
594 | flash_success( |
||
595 | gettext( |
||
596 | "The email address of user '%(screen_name)s' has been invalidated.", |
||
597 | screen_name=user.screen_name, |
||
598 | ) |
||
599 | ) |
||
600 | |||
601 | return redirect_to('.view', user_id=user.id) |
||
602 | |||
603 | |||
604 | # -------------------------------------------------------------------- # |
||
605 | # details |
||
606 | 1 | ||
607 | 1 | ||
608 | 1 | @blueprint.get('/<uuid:user_id>/details') |
|
609 | 1 | @permission_required('user.administrate') |
|
610 | @templated |
||
611 | def change_details_form(user_id, erroneous_form=None): |
||
612 | """Show a form to change the user's details.""" |
||
613 | user = _get_user_for_admin_or_404(user_id) |
||
614 | |||
615 | detail = user_service.get_detail(user_id) |
||
616 | |||
617 | form = erroneous_form if erroneous_form else ChangeDetailsForm(obj=detail) |
||
618 | country_names = country_service.get_country_names() |
||
619 | |||
620 | return { |
||
621 | 'profile_user': user, |
||
622 | 'user': user, |
||
623 | 'form': form, |
||
624 | 'country_names': country_names, |
||
625 | } |
||
626 | 1 | ||
627 | 1 | ||
628 | 1 | @blueprint.post('/<uuid:user_id>/details') |
|
629 | @permission_required('user.administrate') |
||
630 | def change_details(user_id): |
||
631 | """Change the user's details.""" |
||
632 | user = _get_user_or_404(user_id) |
||
633 | |||
634 | form = ChangeDetailsForm(request.form) |
||
635 | if not form.validate(): |
||
636 | return change_details_form(user.id, form) |
||
637 | |||
638 | first_name = form.first_name.data.strip() |
||
639 | last_name = form.last_name.data.strip() |
||
640 | date_of_birth = form.date_of_birth.data |
||
641 | country = form.country.data.strip() |
||
642 | zip_code = form.zip_code.data.strip() |
||
643 | city = form.city.data.strip() |
||
644 | street = form.street.data.strip() |
||
645 | phone_number = form.phone_number.data.strip() |
||
646 | initiator = g.user |
||
647 | |||
648 | event = user_command_service.update_user_details( |
||
649 | user.id, |
||
650 | first_name, |
||
651 | last_name, |
||
652 | date_of_birth, |
||
653 | country, |
||
654 | zip_code, |
||
655 | city, |
||
656 | street, |
||
657 | phone_number, |
||
658 | initiator, |
||
659 | ) |
||
660 | |||
661 | flash_success(gettext('Changes have been saved.')) |
||
662 | |||
663 | user_signals.details_updated.send(None, event=event) |
||
664 | |||
665 | return redirect_to('.view', user_id=user.id) |
||
666 | |||
667 | |||
668 | # -------------------------------------------------------------------- # |
||
669 | # authentication |
||
670 | 1 | ||
671 | 1 | ||
672 | 1 | @blueprint.get('/<uuid:user_id>/password') |
|
673 | 1 | @permission_required('user.set_password') |
|
674 | @templated |
||
675 | 1 | def set_password_form(user_id, erroneous_form=None): |
|
676 | """Show a form to set a new password for the user.""" |
||
677 | 1 | user = _get_user_for_admin_or_404(user_id) |
|
678 | |||
679 | 1 | form = erroneous_form if erroneous_form else SetPasswordForm() |
|
680 | |||
681 | return { |
||
682 | 'profile_user': user, |
||
683 | 'user': user, |
||
684 | 'form': form, |
||
685 | } |
||
686 | 1 | ||
687 | 1 | ||
688 | 1 | @blueprint.post('/<uuid:user_id>/password') |
|
689 | @permission_required('user.set_password') |
||
690 | def set_password(user_id): |
||
691 | """Set a new password for the user.""" |
||
692 | user = _get_user_or_404(user_id) |
||
693 | |||
694 | form = SetPasswordForm(request.form) |
||
695 | if not form.validate(): |
||
696 | return set_password_form(user.id, form) |
||
697 | |||
698 | new_password = form.password.data |
||
699 | initiator = g.user |
||
700 | |||
701 | event = authn_password_service.update_password_hash( |
||
702 | user, new_password, initiator |
||
703 | ) |
||
704 | |||
705 | authn_signals.password_updated.send(None, event=event) |
||
706 | |||
707 | flash_success( |
||
708 | gettext( |
||
709 | "New password has been set for user '%(screen_name)s'.", |
||
710 | screen_name=user.screen_name, |
||
711 | ) |
||
712 | ) |
||
713 | |||
714 | return redirect_to('.view', user_id=user.id) |
||
715 | |||
716 | |||
717 | # -------------------------------------------------------------------- # |
||
718 | # authorization |
||
719 | 1 | ||
720 | 1 | ||
721 | 1 | @blueprint.get('/<uuid:user_id>/permissions') |
|
722 | 1 | @permission_required('user.view') |
|
723 | @templated |
||
724 | 1 | def view_permissions(user_id): |
|
725 | """Show user's permissions.""" |
||
726 | 1 | user = _get_user_for_admin_or_404(user_id) |
|
727 | |||
728 | user_permission_ids_by_role = ( |
||
729 | authz_service.get_permission_ids_by_role_for_user(user.id) |
||
730 | 1 | ) |
|
731 | |||
732 | permissions_by_role = _index_permissions_by_role( |
||
733 | user_permission_ids_by_role |
||
734 | 1 | ) |
|
735 | |||
736 | return { |
||
737 | 'profile_user': user, |
||
738 | 'user': user, |
||
739 | 'permissions_by_role': permissions_by_role, |
||
740 | } |
||
741 | 1 | ||
742 | 1 | ||
743 | 1 | @blueprint.get('/<uuid:user_id>/roles/assignment') |
|
744 | 1 | @permission_required('role.assign') |
|
745 | @templated |
||
746 | 1 | def manage_roles(user_id): |
|
747 | """Manage what roles are assigned to the user.""" |
||
748 | 1 | user = _get_user_for_admin_or_404(user_id) |
|
749 | |||
750 | 1 | permission_ids_by_role = authz_service.get_permission_ids_by_role() |
|
751 | |||
752 | 1 | permissions_by_role = _index_permissions_by_role(permission_ids_by_role) |
|
753 | |||
754 | 1 | user_role_ids = authz_service.find_role_ids_for_user(user.id) |
|
755 | |||
756 | return { |
||
757 | 'profile_user': user, |
||
758 | 'user': user, |
||
759 | 'permissions_by_role': permissions_by_role, |
||
760 | 'user_role_ids': user_role_ids, |
||
761 | } |
||
762 | 1 | ||
763 | |||
764 | def _index_permissions_by_role( |
||
765 | 1 | permission_ids_by_role: dict[Role, frozenset[PermissionID]], |
|
766 | ) -> dict[Role, frozenset[Permission]]: |
||
767 | registered_permissions_by_id = { |
||
768 | permission.id: permission |
||
769 | for permission in permission_registry.get_registered_permissions() |
||
770 | 1 | } |
|
771 | |||
772 | return { |
||
773 | role: frozenset( |
||
774 | registered_permissions_by_id[permission_id] |
||
775 | for permission_id in permission_ids |
||
776 | if permission_id in registered_permissions_by_id |
||
777 | ) |
||
778 | for role, permission_ids in permission_ids_by_role.items() |
||
779 | } |
||
780 | 1 | ||
781 | 1 | ||
782 | 1 | @blueprint.post('/<uuid:user_id>/roles/<role_id>') |
|
783 | 1 | @permission_required('role.assign') |
|
784 | @respond_no_content |
||
785 | def role_assign(user_id, role_id): |
||
786 | """Assign the role to the user.""" |
||
787 | user = _get_user_or_404(user_id) |
||
788 | role = _get_role_or_404(role_id) |
||
789 | initiator = g.user |
||
790 | |||
791 | event = authz_service.assign_role_to_user( |
||
792 | role.id, user, initiator=initiator |
||
793 | ) |
||
794 | |||
795 | if event is not None: |
||
796 | authz_signals.role_assigned_to_user.send(None, event=event) |
||
797 | |||
798 | flash_success( |
||
799 | gettext( |
||
800 | '%(role_title)s has been assigned to "%(screen_name)s".', |
||
801 | screen_name=user.screen_name, |
||
802 | role_title=role.title, |
||
803 | ) |
||
804 | ) |
||
805 | 1 | ||
806 | 1 | ||
807 | 1 | @blueprint.delete('/<uuid:user_id>/roles/<role_id>') |
|
808 | 1 | @permission_required('role.assign') |
|
809 | @respond_no_content |
||
810 | def role_deassign(user_id, role_id): |
||
811 | """Deassign the role from the user.""" |
||
812 | user = _get_user_or_404(user_id) |
||
813 | role = _get_role_or_404(role_id) |
||
814 | initiator = g.user |
||
815 | |||
816 | event = authz_service.deassign_role_from_user( |
||
817 | role.id, user, initiator=initiator |
||
818 | ).unwrap() |
||
819 | |||
820 | authz_signals.role_deassigned_from_user.send(None, event=event) |
||
821 | |||
822 | flash_success( |
||
823 | gettext( |
||
824 | '%(role_title)s has been withdrawn from "%(screen_name)s".', |
||
825 | screen_name=user.screen_name, |
||
826 | role_title=role.title, |
||
827 | ) |
||
828 | ) |
||
829 | |||
830 | |||
831 | # -------------------------------------------------------------------- # |
||
832 | # events |
||
833 | 1 | ||
834 | 1 | ||
835 | 1 | @blueprint.get('/<uuid:user_id>/events') |
|
836 | 1 | @permission_required('user.view') |
|
837 | @templated |
||
838 | 1 | def view_events(user_id): |
|
839 | """Show user's events.""" |
||
840 | 1 | user = _get_user_for_admin_or_404(user_id) |
|
841 | |||
842 | 1 | log_entries = list(service.get_log_entries(user.id)) |
|
843 | 1 | ||
844 | include_logins = request.args.get('include_logins', default='yes') == 'yes' |
||
845 | if not include_logins: |
||
846 | log_entries = [ |
||
847 | entry |
||
848 | for entry in log_entries |
||
849 | if entry['event_type'] != 'user-logged-in' |
||
850 | 1 | ] |
|
851 | |||
852 | return { |
||
853 | 'profile_user': user, |
||
854 | 'user': user, |
||
855 | 'log_entries': log_entries, |
||
856 | 'logins_included': include_logins, |
||
857 | } |
||
858 | |||
859 | |||
860 | # -------------------------------------------------------------------- # |
||
861 | # helpers |
||
862 | 1 | ||
863 | 1 | ||
864 | def _get_user_for_admin_or_404(user_id) -> UserForAdmin: |
||
865 | 1 | user = user_service.find_user_for_admin(user_id) |
|
866 | |||
867 | if user is None: |
||
868 | 1 | abort(404) |
|
869 | |||
870 | return user |
||
871 | 1 | ||
872 | |||
873 | def _get_user_or_404(user_id): |
||
874 | user = user_service.find_user(user_id) |
||
875 | |||
876 | if user is None: |
||
877 | abort(404) |
||
878 | |||
879 | return user |
||
880 | 1 | ||
881 | |||
882 | def _get_role_or_404(role_id): |
||
883 | role = authz_service.find_role(role_id) |
||
884 | |||
885 | if role is None: |
||
886 | abort(404) |
||
887 | |||
888 | return role |
||
889 |