Passed
Push — main ( d49116...8987e2 )
by Jochen
04:25
created

byceps.services.user.service.get_detail()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 1
dl 0
loc 4
ccs 3
cts 3
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
"""
2
byceps.services.user.service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2021 Jochen Kupperschmidt
6
:License: Revised BSD (see `LICENSE` file for details)
7
"""
8
9 1
from __future__ import annotations
10 1
from typing import Optional
11
12 1
from ...database import db, Query
13 1
from ...typing import PartyID, UserID
14
15 1
from ..orga_team.dbmodels import OrgaTeam, Membership as OrgaTeamMembership
16 1
from ..user_avatar.dbmodels import Avatar, AvatarSelection
17
18 1
from .dbmodels.detail import UserDetail as DbUserDetail
19 1
from .dbmodels.user import User as DbUser
20 1
from .transfer.models import User, UserDetail, UserWithDetail
21
22
23 1
class UserIdRejected(Exception):
24
    """Indicate that the given user ID is not accepted.
25
26
    Reasons can include the user ID being
27
    - not well-formed,
28
    - unknown,
29
    or the associated account being
30
    - not yet initialized,
31
    - suspended,
32
    - or deleted.
33
    """
34
35
36 1
def find_active_user(
37
    user_id: UserID,
38
    *,
39
    include_avatar: bool = False,
40
    include_orga_flag_for_party_id: Optional[PartyID] = None,
41
) -> Optional[User]:
42
    """Return the user with that ID if the account is "active", or
43
    `None` if:
44
    - the ID is unknown.
45
    - the account has not been activated, yet.
46
    - the account is currently suspended.
47
    - the account is marked as deleted.
48
    """
49 1
    query = _get_user_query(include_avatar, include_orga_flag_for_party_id)
50
51 1
    row = query \
52
        .filter(DbUser.initialized == True) \
53
        .filter(DbUser.suspended == False) \
54
        .filter(DbUser.deleted == False) \
55
        .filter(DbUser.id == user_id) \
56
        .one_or_none()
57
58 1
    if row is None:
59 1
        return None
60
61 1
    return _user_row_to_dto(row)
62
63
64 1
def find_user(
65
    user_id: UserID,
66
    *,
67
    include_avatar: bool = False,
68
    include_orga_flag_for_party_id: Optional[PartyID] = None,
69
) -> Optional[User]:
70
    """Return the user with that ID, or `None` if not found.
71
72
    Include avatar URL if requested.
73
    """
74 1
    row = _get_user_query(include_avatar, include_orga_flag_for_party_id) \
75
        .filter(DbUser.id == user_id) \
76
        .one_or_none()
77
78 1
    if row is None:
79
        return None
80
81 1
    return _user_row_to_dto(row)
82
83
84 1
def get_user(
85
    user_id: UserID,
86
    *,
87
    include_avatar: bool = False,
88
    include_orga_flag_for_party_id: Optional[PartyID] = None,
89
) -> User:
90
    """Return the user with that ID, or raise an exception.
91
92
    Include avatar URL if requested.
93
    """
94 1
    user = find_user(
95
        user_id,
96
        include_avatar=include_avatar,
97
        include_orga_flag_for_party_id=include_orga_flag_for_party_id,
98
    )
99
100 1
    if user is None:
101
        raise ValueError(f"Unknown user ID '{user_id}'")
102
103 1
    return user
104
105
106 1
def find_users(
107
    user_ids: set[UserID],
108
    *,
109
    include_avatars: bool = False,
110
    include_orga_flags_for_party_id: Optional[PartyID] = None,
111
) -> set[User]:
112
    """Return the users with those IDs.
113
114
    Their respective avatars' URLs are included, if requested.
115
    """
116 1
    if not user_ids:
117 1
        return set()
118
119 1
    query = _get_user_query(include_avatars, include_orga_flags_for_party_id)
120
121 1
    rows = query \
122
        .filter(DbUser.id.in_(frozenset(user_ids))) \
123
        .all()
124
125 1
    return {_user_row_to_dto(row) for row in rows}
126
127
128 1
def _get_user_query(
129
    include_avatar: bool,
130
    include_orga_flags_for_party_id: Optional[PartyID] = None,
131
) -> Query:
132 1
    orga_flag_expression = db.false()
133 1
    if include_orga_flags_for_party_id is not None:
134 1
        orga_flag_expression = _get_orga_flag_subquery(
135
            include_orga_flags_for_party_id)
136
137 1
    query = db.session \
138
        .query(
139
            DbUser.id,
140
            DbUser.screen_name,
141
            DbUser.suspended,
142
            DbUser.deleted,
143
            DbUser.locale,
144
            Avatar if include_avatar else db.null(),
145
            orga_flag_expression,
146
        )
147
148 1
    if include_avatar:
149 1
        query = query \
150
            .outerjoin(AvatarSelection, DbUser.avatar_selection) \
151
            .outerjoin(Avatar)
152
153 1
    return query
154
155
156 1
def _get_orga_flag_subquery(party_id: PartyID):
157 1
    return db.session \
158
        .query(
159
            db.func.count(OrgaTeamMembership.id)
160
        ) \
161
        .join(OrgaTeam) \
162
        .filter(OrgaTeam.party_id == party_id) \
163
        .filter(OrgaTeamMembership.user_id == DbUser.id) \
164
        .exists()
165
166
167 1
def _user_row_to_dto(
168
    row: tuple[UserID, str, bool, bool, Optional[str], Optional[Avatar], bool]
169
) -> User:
170 1
    user_id, screen_name, suspended, deleted, locale, avatar, is_orga = row
171 1
    avatar_url = avatar.url if (avatar is not None) else None
172
173 1
    return User(
174
        id=user_id,
175
        screen_name=screen_name,
176
        suspended=suspended,
177
        deleted=deleted,
178
        locale=locale,
179
        avatar_url=avatar_url,
180
        is_orga=is_orga,
181
    )
182
183
184 1
def find_user_by_email_address(email_address: str) -> Optional[User]:
185
    """Return the user with that email address, or `None` if not found."""
186 1
    user = DbUser.query \
187
        .filter(
188
            db.func.lower(DbUser.email_address) == email_address.lower()
189
        ) \
190
        .one_or_none()
191
192 1
    if user is None:
193 1
        return None
194
195 1
    return _db_entity_to_user(user)
196
197
198 1
def find_user_by_screen_name(
199
    screen_name: str, *, case_insensitive=False
200
) -> Optional[User]:
201
    """Return the user with that screen name, or `None` if not found."""
202 1
    user = find_db_user_by_screen_name(
203
        screen_name, case_insensitive=case_insensitive
204
    )
205
206 1
    if user is None:
207 1
        return None
208
209 1
    return _db_entity_to_user(user)
210
211
212 1
def find_db_user_by_screen_name(
213
    screen_name: str, *, case_insensitive=False
214
) -> Optional[DbUser]:
215
    """Return the user with that screen name, or `None` if not found."""
216 1
    query = DbUser.query
217
218 1
    if case_insensitive:
219 1
        query = query.filter(
220
            db.func.lower(DbUser.screen_name) == screen_name.lower()
221
        )
222
    else:
223 1
        query = query.filter_by(screen_name=screen_name)
224
225 1
    return query.one_or_none()
226
227
228 1
def find_user_with_details(user_id: UserID) -> Optional[DbUser]:
229
    """Return the user and its details."""
230 1
    return DbUser.query \
231
        .options(db.joinedload('detail')) \
232
        .get(user_id)
233
234
235 1
def get_db_user(user_id: UserID) -> DbUser:
236
    """Return the user with that ID, or raise an exception."""
237 1
    user = DbUser.query.get(user_id)
238
239 1
    if user is None:
240
        raise ValueError(f"Unknown user ID '{user_id}'")
241
242 1
    return user
243
244
245 1
def _db_entity_to_user(user: DbUser) -> User:
246 1
    avatar_url = None
247 1
    is_orga = False  # Information is not available here by design.
248
249 1
    return User(
250
        id=user.id,
251
        screen_name=user.screen_name,
252
        suspended=user.suspended,
253
        deleted=user.deleted,
254
        locale=user.locale,
255
        avatar_url=avatar_url,
256
        is_orga=is_orga,
257
    )
258
259
260 1
def _db_entity_to_user_detail(detail: DbUserDetail) -> UserDetail:
261 1
    return UserDetail(
262
        first_names=detail.first_names,
263
        last_name=detail.last_name,
264
        date_of_birth=detail.date_of_birth,
265
        country=detail.country,
266
        zip_code=detail.zip_code,
267
        city=detail.city,
268
        street=detail.street,
269
        phone_number=detail.phone_number,
270
        internal_comment=detail.internal_comment,
271
        extras=detail.extras,
272
    )
273
274
275 1
def _db_entity_to_user_with_detail(user: DbUser) -> UserWithDetail:
276 1
    user_dto = _db_entity_to_user(user)
277 1
    detail_dto = _db_entity_to_user_detail(user.detail)
278
279 1
    return UserWithDetail(
280
        id=user_dto.id,
281
        screen_name=user_dto.screen_name,
282
        suspended=user_dto.suspended,
283
        deleted=user_dto.deleted,
284
        locale=user_dto.locale,
285
        avatar_url=user_dto.avatar_url,
286
        is_orga=user_dto.is_orga,
287
        detail=detail_dto,
288
    )
289
290
291 1
def find_screen_name(user_id: UserID) -> Optional[str]:
292
    """Return the user's screen name, if available."""
293 1
    screen_name = db.session \
294
        .query(DbUser.screen_name) \
295
        .filter_by(id=user_id) \
296
        .scalar()
297
298 1
    if screen_name is None:
299 1
        return None
300
301 1
    return screen_name
302
303
304 1
def find_email_address(user_id: UserID) -> Optional[str]:
305
    """Return the user's e-mail address, if set."""
306 1
    return db.session \
307
        .query(DbUser.email_address) \
308
        .filter_by(id=user_id) \
309
        .scalar()
310
311
312 1
def get_email_address(user_id: UserID) -> str:
313
    """Return the user's e-mail address."""
314 1
    email_address = find_email_address(user_id)
315
316 1
    if email_address is None:
317 1
        raise ValueError(
318
            f"Unknown user ID '{user_id}' or user has no email address"
319
        )
320
321 1
    return email_address
322
323
324 1
def get_email_addresses(user_ids: set[UserID]) -> set[tuple[UserID, str]]:
325
    """Return the users' e-mail addresses."""
326
    return db.session \
327
        .query(
328
            DbUser.id,
329
            DbUser.email_address,
330
        ) \
331
        .filter(DbUser.id.in_(user_ids)) \
332
        .all()
333
334
335 1
def get_detail(user_id: UserID) -> UserDetail:
336
    """Return the user's details."""
337 1
    user = get_db_user(user_id)
338 1
    return _db_entity_to_user_detail(user.detail)
339
340
341 1
def get_sort_key_for_screen_name(user: User) -> tuple[bool, str]:
342
    """Return a key for sorting by screen name.
343
344
    - Orders screen names case-insensitively.
345
    - Handles absent screen names (i.e. `None`) and places them at the
346
      end.
347
    """
348 1
    normalized_screen_name = (user.screen_name or '').lower()
349 1
    has_screen_name = bool(normalized_screen_name)
350 1
    return not has_screen_name, normalized_screen_name
351
352
353 1
def index_users_by_id(users: set[User]) -> dict[UserID, User]:
354
    """Map the users' IDs to the corresponding user objects."""
355 1
    return {user.id: user for user in users}
356
357
358 1
def is_screen_name_already_assigned(screen_name: str) -> bool:
359
    """Return `True` if a user with that screen name exists."""
360 1
    return _do_users_matching_filter_exist(DbUser.screen_name, screen_name)
361
362
363 1
def is_email_address_already_assigned(email_address: str) -> bool:
364
    """Return `True` if a user with that email address exists."""
365 1
    return _do_users_matching_filter_exist(DbUser.email_address, email_address)
366
367
368 1
def _do_users_matching_filter_exist(
369
    model_attribute: str, search_value: str
370
) -> bool:
371
    """Return `True` if any users match the filter.
372
373
    Comparison is done case-insensitively.
374
    """
375 1
    return db.session \
376
        .query(
377
            db.session \
378
                .query(DbUser) \
379
                .filter(db.func.lower(model_attribute) == search_value.lower()) \
380
                .exists()
381
        ) \
382
        .scalar()
383