byceps.services.user.service   C
last analyzed

Complexity

Total Complexity 56

Size/Duplication

Total Lines 510
Duplicated Lines 0 %

Test Coverage

Coverage 83.67%

Importance

Changes 0
Metric Value
eloc 301
dl 0
loc 510
ccs 123
cts 147
cp 0.8367
rs 5.5199
c 0
b 0
f 0
wmc 56

31 Functions

Rating   Name   Duplication   Size   Complexity  
A find_screen_name() 0 11 2
A get_users_created_since() 0 23 2
A find_user_by_screen_name() 0 12 2
A get_users_paginated() 0 30 2
A find_active_user() 0 25 2
A find_user_by_email_address() 0 13 2
A is_email_address_already_assigned() 0 3 1
A get_email_address_data() 0 16 2
A find_user_with_details() 0 5 1
A _filter_by_search_term() 0 7 1
A get_user_for_admin() 0 8 2
A _get_user_query() 0 19 3
A _db_entity_to_user() 0 8 1
A get_user() 0 11 2
B _filter_by_state() 0 22 5
A find_db_user_by_screen_name() 0 14 2
A is_screen_name_already_assigned() 0 3 1
A _db_entity_to_user_for_admin() 0 14 3
A _generate_search_clauses_for_term() 0 8 1
A find_user() 0 17 2
A get_detail() 0 18 2
A _user_row_to_dto() 0 13 2
A get_users() 0 19 2
A find_email_address() 0 6 1
A get_email_address() 0 10 2
A get_db_user() 0 8 2
A get_email_addresses() 0 9 1
A index_users_by_id() 0 3 1
A find_user_for_admin() 0 16 2
A _do_users_matching_filter_exist() 0 15 1
A get_sort_key_for_screen_name() 0 10 1

How to fix   Complexity   

Complexity

Complex classes like byceps.services.user.service often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
byceps.services.user.service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2021 Jochen Kupperschmidt
6
:License: Revised BSD (see `LICENSE` file for details)
7
"""
8
9 1
from __future__ import annotations
10 1
from datetime import datetime, timedelta
11 1
from typing import Optional
12
13 1
from sqlalchemy import select
14
15 1
from ...database import db, paginate, Pagination, Query
16 1
from ...typing import UserID
17
18 1
from ..user_avatar.dbmodels import (
19
    Avatar as DbAvatar,
20
    AvatarSelection as DbAvatarSelection,
21
)
22
23 1
from .dbmodels.detail import UserDetail as DbUserDetail
24 1
from .dbmodels.user import User as DbUser
25 1
from .transfer.models import (
26
    User,
27
    UserDetail,
28
    UserEmailAddress,
29
    UserForAdmin,
30
    UserForAdminDetail,
31
    UserStateFilter,
32
)
33
34
35 1
class UserIdRejected(Exception):
36
    """Indicate that the given user ID is not accepted.
37
38
    Reasons can include the user ID being
39
    - not well-formed,
40
    - unknown,
41
    or the associated account being
42
    - not yet initialized,
43
    - suspended,
44
    - or deleted.
45
    """
46
47
48 1
def find_active_user(
49
    user_id: UserID,
50
    *,
51
    include_avatar: bool = False,
52
) -> Optional[User]:
53
    """Return the user with that ID if the account is "active", or
54
    `None` if:
55
    - the ID is unknown.
56
    - the account has not been activated, yet.
57
    - the account is currently suspended.
58
    - the account is marked as deleted.
59
    """
60 1
    query = _get_user_query(include_avatar)
61
62 1
    row = query \
63
        .filter(DbUser.initialized == True) \
64
        .filter(DbUser.suspended == False) \
65
        .filter(DbUser.deleted == False) \
66
        .filter(DbUser.id == user_id) \
67
        .one_or_none()
68
69 1
    if row is None:
70 1
        return None
71
72 1
    return _user_row_to_dto(row)
73
74
75 1
def find_user(
76
    user_id: UserID,
77
    *,
78
    include_avatar: bool = False,
79
) -> Optional[User]:
80
    """Return the user with that ID, or `None` if not found.
81
82
    Include avatar URL if requested.
83
    """
84 1
    row = _get_user_query(include_avatar) \
85
        .filter(DbUser.id == user_id) \
86
        .one_or_none()
87
88 1
    if row is None:
89
        return None
90
91 1
    return _user_row_to_dto(row)
92
93
94 1
def get_user(user_id: UserID, *, include_avatar: bool = False) -> User:
95
    """Return the user with that ID, or raise an exception.
96
97
    Include avatar URL if requested.
98
    """
99 1
    user = find_user(user_id, include_avatar=include_avatar)
100
101 1
    if user is None:
102
        raise ValueError(f"Unknown user ID '{user_id}'")
103
104 1
    return user
105
106
107 1
def get_users(
108
    user_ids: set[UserID],
109
    *,
110
    include_avatars: bool = False,
111
) -> set[User]:
112
    """Return the users with those IDs.
113
114
    Their respective avatars' URLs are included, if requested.
115
    """
116 1
    if not user_ids:
117 1
        return set()
118
119 1
    query = _get_user_query(include_avatars)
120
121 1
    rows = query \
122
        .filter(DbUser.id.in_(frozenset(user_ids))) \
123
        .all()
124
125 1
    return {_user_row_to_dto(row) for row in rows}
126
127
128 1
def _get_user_query(
129
    include_avatar: bool,
130
) -> Query:
131 1
    query = db.session \
132
        .query(
133
            DbUser.id,
134
            DbUser.screen_name,
135
            DbUser.suspended,
136
            DbUser.deleted,
137
            DbUser.locale,
138
            DbAvatar if include_avatar else db.null(),
139
        )
140
141 1
    if include_avatar:
142 1
        query = query \
143
            .outerjoin(DbAvatarSelection, DbUser.avatar_selection) \
144
            .outerjoin(DbAvatar)
145
146 1
    return query
147
148
149 1
def _user_row_to_dto(
150
    row: tuple[UserID, str, bool, bool, Optional[str], Optional[DbAvatar]]
151
) -> User:
152 1
    user_id, screen_name, suspended, deleted, locale, avatar = row
153 1
    avatar_url = avatar.url if (avatar is not None) else None
154
155 1
    return User(
156
        id=user_id,
157
        screen_name=screen_name,
158
        suspended=suspended,
159
        deleted=deleted,
160
        locale=locale,
161
        avatar_url=avatar_url,
162
    )
163
164
165 1
def find_user_by_email_address(email_address: str) -> Optional[User]:
166
    """Return the user with that email address, or `None` if not found."""
167 1
    user = db.session \
168
        .query(DbUser) \
169
        .filter(
170
            db.func.lower(DbUser.email_address) == email_address.lower()
171
        ) \
172
        .one_or_none()
173
174 1
    if user is None:
175 1
        return None
176
177 1
    return _db_entity_to_user(user)
178
179
180 1
def find_user_by_screen_name(
181
    screen_name: str, *, case_insensitive=False
182
) -> Optional[User]:
183
    """Return the user with that screen name, or `None` if not found."""
184 1
    user = find_db_user_by_screen_name(
185
        screen_name, case_insensitive=case_insensitive
186
    )
187
188 1
    if user is None:
189 1
        return None
190
191 1
    return _db_entity_to_user(user)
192
193
194 1
def find_db_user_by_screen_name(
195
    screen_name: str, *, case_insensitive=False
196
) -> Optional[DbUser]:
197
    """Return the user with that screen name, or `None` if not found."""
198 1
    query = db.session.query(DbUser)
199
200 1
    if case_insensitive:
201 1
        query = query.filter(
202
            db.func.lower(DbUser.screen_name) == screen_name.lower()
203
        )
204
    else:
205 1
        query = query.filter_by(screen_name=screen_name)
206
207 1
    return query.one_or_none()
208
209
210 1
def find_user_with_details(user_id: UserID) -> Optional[DbUser]:
211
    """Return the user and its details."""
212 1
    return db.session.query(DbUser) \
213
        .options(db.joinedload(DbUser.detail)) \
214
        .get(user_id)
215
216
217 1
def get_db_user(user_id: UserID) -> DbUser:
218
    """Return the user with that ID, or raise an exception."""
219 1
    user = db.session.query(DbUser).get(user_id)
220
221 1
    if user is None:
222
        raise ValueError(f"Unknown user ID '{user_id}'")
223
224 1
    return user
225
226
227 1
def find_user_for_admin(user_id: UserID) -> Optional[UserForAdmin]:
228
    """Return the user with that ID, or `None` if not found."""
229 1
    user = db.session \
230
        .query(DbUser) \
231
        .options(
232
            db.joinedload(DbUser.avatar_selection)
233
                .joinedload(DbAvatarSelection.avatar),
234
            db.joinedload(DbUser.detail)
235
                .load_only(DbUserDetail.first_names, DbUserDetail.last_name),
236
        ) \
237
        .get(user_id)
238
239 1
    if user is None:
240
        return None
241
242 1
    return _db_entity_to_user_for_admin(user)
243
244
245 1
def get_user_for_admin(user_id: UserID) -> UserForAdmin:
246
    """Return the user with that ID, or raise an exception."""
247
    user = find_user_for_admin(user_id)
248
249
    if user is None:
250
        raise ValueError(f"Unknown user ID '{user_id}'")
251
252
    return user
253
254
255 1
def _db_entity_to_user(user: DbUser) -> User:
256 1
    return User(
257
        id=user.id,
258
        screen_name=user.screen_name,
259
        suspended=user.suspended,
260
        deleted=user.deleted,
261
        locale=user.locale,
262
        avatar_url=None,
263
    )
264
265
266 1
def _db_entity_to_user_for_admin(user: DbUser) -> UserForAdmin:
267 1
    full_name = user.detail.full_name if user.detail is not None else None
268 1
    detail = UserForAdminDetail(full_name=full_name)
269
270 1
    return UserForAdmin(
271
        id=user.id,
272
        screen_name=user.screen_name,
273
        suspended=user.suspended,
274
        deleted=user.deleted,
275
        locale=user.locale,
276
        avatar_url=user.avatar.url if user.avatar else None,
277
        created_at=user.created_at,
278
        initialized=user.initialized,
279
        detail=detail,
280
    )
281
282
283 1
def find_screen_name(user_id: UserID) -> Optional[str]:
284
    """Return the user's screen name, if available."""
285 1
    screen_name = db.session \
286
        .query(DbUser.screen_name) \
287
        .filter_by(id=user_id) \
288
        .scalar()
289
290 1
    if screen_name is None:
291 1
        return None
292
293 1
    return screen_name
294
295
296 1
def find_email_address(user_id: UserID) -> Optional[str]:
297
    """Return the user's e-mail address, if set."""
298 1
    return db.session \
299
        .query(DbUser.email_address) \
300
        .filter_by(id=user_id) \
301
        .scalar()
302
303
304 1
def get_email_address(user_id: UserID) -> str:
305
    """Return the user's e-mail address."""
306 1
    email_address = find_email_address(user_id)
307
308 1
    if email_address is None:
309 1
        raise ValueError(
310
            f"Unknown user ID '{user_id}' or user has no email address"
311
        )
312
313 1
    return email_address
314
315
316 1
def get_email_address_data(user_id: UserID) -> UserEmailAddress:
317
    """Return the user's e-mail address data."""
318
    row = db.session.execute(
319
        select(
320
            DbUser.email_address,
321
            DbUser.email_address_verified,
322
        ) \
323
        .filter_by(id=user_id)
324
    ).one_or_none()
325
326
    if row is None:
327
        raise ValueError(f"Unknown user ID '{user_id}'")
328
329
    return UserEmailAddress(
330
        address=row[0],
331
        verified=row[1],
332
    )
333
334
335 1
def get_email_addresses(user_ids: set[UserID]) -> set[tuple[UserID, str]]:
336
    """Return the users' e-mail addresses."""
337
    return db.session \
338
        .query(
339
            DbUser.id,
340
            DbUser.email_address,
341
        ) \
342
        .filter(DbUser.id.in_(user_ids)) \
343
        .all()
344
345
346 1
def get_detail(user_id: UserID) -> UserDetail:
347
    """Return the user's details."""
348 1
    detail = db.session.get(DbUserDetail, user_id)
349
350 1
    if detail is None:
351
        raise ValueError(f"Unknown user ID '{user_id}'")
352
353 1
    return UserDetail(
354
        first_names=detail.first_names,
355
        last_name=detail.last_name,
356
        date_of_birth=detail.date_of_birth,
357
        country=detail.country,
358
        zip_code=detail.zip_code,
359
        city=detail.city,
360
        street=detail.street,
361
        phone_number=detail.phone_number,
362
        internal_comment=detail.internal_comment,
363
        extras=detail.extras,
364
    )
365
366
367 1
def get_sort_key_for_screen_name(user: User) -> tuple[bool, str]:
368
    """Return a key for sorting by screen name.
369
370
    - Orders screen names case-insensitively.
371
    - Handles absent screen names (i.e. `None`) and places them at the
372
      end.
373
    """
374 1
    normalized_screen_name = (user.screen_name or '').lower()
375 1
    has_screen_name = bool(normalized_screen_name)
376 1
    return not has_screen_name, normalized_screen_name
377
378
379 1
def index_users_by_id(users: set[User]) -> dict[UserID, User]:
380
    """Map the users' IDs to the corresponding user objects."""
381 1
    return {user.id: user for user in users}
382
383
384 1
def is_screen_name_already_assigned(screen_name: str) -> bool:
385
    """Return `True` if a user with that screen name exists."""
386 1
    return _do_users_matching_filter_exist(DbUser.screen_name, screen_name)
387
388
389 1
def is_email_address_already_assigned(email_address: str) -> bool:
390
    """Return `True` if a user with that email address exists."""
391 1
    return _do_users_matching_filter_exist(DbUser.email_address, email_address)
392
393
394 1
def _do_users_matching_filter_exist(
395
    model_attribute: str, search_value: str
396
) -> bool:
397
    """Return `True` if any users match the filter.
398
399
    Comparison is done case-insensitively.
400
    """
401 1
    return db.session \
402
        .query(
403
            db.session \
404
                .query(DbUser) \
405
                .filter(db.func.lower(model_attribute) == search_value.lower()) \
406
                .exists()
407
        ) \
408
        .scalar()
409
410
411 1
def get_users_created_since(
412
    delta: timedelta, limit: Optional[int] = None
413
) -> list[UserForAdmin]:
414
    """Return the user accounts created since `delta` ago."""
415 1
    filter_starts_at = datetime.utcnow() - delta
416
417 1
    query = db.session \
418
        .query(DbUser) \
419
        .options(
420
            db.joinedload(DbUser.avatar_selection)
421
                .joinedload(DbAvatarSelection.avatar),
422
            db.joinedload(DbUser.detail)
423
                .load_only(DbUserDetail.first_names, DbUserDetail.last_name),
424
        ) \
425
        .filter(DbUser.created_at >= filter_starts_at) \
426
        .order_by(DbUser.created_at.desc())
427
428 1
    if limit is not None:
429 1
        query = query.limit(limit)
430
431 1
    users = query.all()
432
433 1
    return [_db_entity_to_user_for_admin(u) for u in users]
434
435
436 1
def get_users_paginated(
437
    page: int,
438
    per_page: int,
439
    *,
440
    search_term: Optional[str] = None,
441
    state_filter: Optional[UserStateFilter] = None,
442
) -> Pagination:
443
    """Return the users to show on the specified page, optionally
444
    filtered by search term or flags.
445
    """
446 1
    query = db.session \
447
        .query(DbUser) \
448
        .options(
449
            db.joinedload(DbUser.avatar_selection)
450
                .joinedload(DbAvatarSelection.avatar),
451
            db.joinedload(DbUser.detail)
452
                .load_only(DbUserDetail.first_names, DbUserDetail.last_name),
453
        ) \
454
        .order_by(DbUser.created_at.desc())
455
456 1
    query = _filter_by_state(query, state_filter)
457
458 1
    if search_term:
459
        query = _filter_by_search_term(query, search_term)
460
461 1
    return paginate(
462
        query,
463
        page,
464
        per_page,
465
        item_mapper=_db_entity_to_user_for_admin,
466
    )
467
468
469 1
def _filter_by_state(
470
    query: Query, state_filter: Optional[UserStateFilter] = None
471
) -> Query:
472 1
    if state_filter == UserStateFilter.active:
473
        return query \
474
            .filter_by(initialized=True) \
475
            .filter_by(suspended=False) \
476
            .filter_by(deleted=False)
477 1
    elif state_filter == UserStateFilter.uninitialized:
478
        return query \
479
            .filter_by(initialized=False) \
480
            .filter_by(suspended=False) \
481
            .filter_by(deleted=False)
482 1
    elif state_filter == UserStateFilter.suspended:
483
        return query \
484
            .filter_by(suspended=True) \
485
            .filter_by(deleted=False)
486 1
    elif state_filter == UserStateFilter.deleted:
487
        return query \
488
            .filter_by(deleted=True)
489
    else:
490 1
        return query
491
492
493 1
def _filter_by_search_term(query: Query, search_term: str) -> Query:
494
    terms = search_term.split(' ')
495
    clauses = map(_generate_search_clauses_for_term, terms)
496
497
    return query \
498
        .join(DbUserDetail) \
499
        .filter(db.and_(*clauses))
500
501
502 1
def _generate_search_clauses_for_term(search_term: str) -> Query:
503
    ilike_pattern = f'%{search_term}%'
504
505
    return db.or_(
506
        DbUser.email_address.ilike(ilike_pattern),
507
        DbUser.screen_name.ilike(ilike_pattern),
508
        DbUserDetail.first_names.ilike(ilike_pattern),
509
        DbUserDetail.last_name.ilike(ilike_pattern),
510
    )
511