Completed
Push — main ( 97c1ff...2a7d5a )
by Jochen
05:40
created

byceps.services.user.service.get_sort_key_for_screen_name()   A

Complexity

Conditions 1

Size

Total Lines 10
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 4
dl 0
loc 10
rs 10
c 0
b 0
f 0
nop 1
ccs 4
cts 4
cp 1
crap 1
1
"""
2
byceps.services.user.service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2020 Jochen Kupperschmidt
6
:License: Modified BSD, see LICENSE for details.
7
"""
8
9 1
from typing import Dict, Optional, Set, Tuple
10
11 1
from ...database import db, Query
12 1
from ...typing import PartyID, UserID
13
14 1
from ..orga_team.models import OrgaTeam, Membership as OrgaTeamMembership
15 1
from ..user_avatar.models import Avatar, AvatarSelection
16
17 1
from .models.detail import UserDetail as DbUserDetail
18 1
from .models.user import AnonymousUser, User as DbUser
19 1
from .transfer.models import User, UserDetail, UserWithDetail
20
21
22 1
class UserIdRejected(Exception):
23
    """Indicate that the given user ID is not accepted.
24
25
    Reasons can include the user ID being
26
    - not well-formed,
27
    - unknown,
28
    or the associated account being
29
    - not yet initialized,
30
    - suspended,
31
    - or deleted.
32
    """
33
34
35 1
def find_active_db_user(user_id: UserID) -> Optional[DbUser]:
36
    """Return the user with that ID if the account is "active", or
37
    `None` if:
38
    - the ID is unknown.
39
    - the account has not been activated, yet.
40
    - the account is currently suspended.
41
    - the account is marked as deleted.
42
    """
43 1
    return DbUser.query \
44
        .filter_by(initialized=True) \
45
        .filter_by(suspended=False) \
46
        .filter_by(deleted=False) \
47
        .filter_by(id=user_id) \
48
        .one_or_none()
49
50
51 1
def find_active_user(
52
    user_id: UserID,
53
    *,
54
    include_avatar: bool = False,
55
    include_orga_flag_for_party_id: Optional[PartyID] = None,
56
) -> Optional[User]:
57
    """Return the user with that ID if the account is "active", or
58
    `None` if:
59
    - the ID is unknown.
60
    - the account has not been activated, yet.
61
    - the account is currently suspended.
62
    - the account is marked as deleted.
63
    """
64 1
    query = _get_user_query(include_avatar, include_orga_flag_for_party_id)
65
66 1
    row = query \
67
        .filter(DbUser.initialized == True) \
68
        .filter(DbUser.suspended == False) \
69
        .filter(DbUser.deleted == False) \
70
        .filter(DbUser.id == user_id) \
71
        .one_or_none()
72
73 1
    if row is None:
74 1
        return None
75
76 1
    return _user_row_to_dto(row)
77
78
79 1
def find_user(
80
    user_id: UserID,
81
    *,
82
    include_avatar: bool = False,
83
    include_orga_flag_for_party_id: Optional[PartyID] = None,
84
) -> Optional[User]:
85
    """Return the user with that ID, or `None` if not found.
86
87
    Include avatar URLs if requested.
88
    """
89 1
    row = _get_user_query(include_avatar, include_orga_flag_for_party_id) \
90
        .filter(DbUser.id == user_id) \
91
        .one_or_none()
92
93 1
    if row is None:
94
        return None
95
96 1
    return _user_row_to_dto(row)
97
98
99 1
def find_users(
100
    user_ids: Set[UserID],
101
    *,
102
    include_avatars: bool = False,
103
    include_orga_flags_for_party_id: Optional[PartyID] = None,
104
) -> Set[User]:
105
    """Return the users with those IDs.
106
107
    Their respective avatars' URLs are included, if requested.
108
    """
109 1
    if not user_ids:
110 1
        return set()
111
112 1
    query = _get_user_query(include_avatars, include_orga_flags_for_party_id)
113
114 1
    rows = query \
115
        .filter(DbUser.id.in_(frozenset(user_ids))) \
116
        .all()
117
118 1
    return {_user_row_to_dto(row) for row in rows}
119
120
121 1
def _get_user_query(
122
    include_avatar: bool,
123
    include_orga_flags_for_party_id: Optional[PartyID] = None,
124
) -> Query:
125 1
    orga_flag_expression = db.false()
126 1
    if include_orga_flags_for_party_id is not None:
127 1
        orga_flag_expression = _get_orga_flag_subquery(
128
            include_orga_flags_for_party_id)
129
130 1
    query = db.session \
131
        .query(
132
            DbUser.id,
133
            DbUser.screen_name,
134
            DbUser.suspended,
135
            DbUser.deleted,
136
            Avatar if include_avatar else db.null(),
137
            orga_flag_expression,
138
        )
139
140 1
    if include_avatar:
141 1
        query = query \
142
            .outerjoin(AvatarSelection, DbUser.avatar_selection) \
143
            .outerjoin(Avatar)
144
145 1
    return query
146
147
148 1
def _get_orga_flag_subquery(party_id: PartyID):
149 1
    return db.session \
150
        .query(
151
            db.func.count(OrgaTeamMembership.id)
152
        ) \
153
        .join(OrgaTeam) \
154
        .filter(OrgaTeam.party_id == party_id) \
155
        .filter(OrgaTeamMembership.user_id == DbUser.id) \
156
        .exists()
157
158
159 1
def _user_row_to_dto(
160
    row: Tuple[UserID, str, bool, bool, Optional[Avatar], bool]
161
) -> User:
162 1
    user_id, screen_name, suspended, deleted, avatar, is_orga = row
163 1
    avatar_url = avatar.url if avatar else None
164
165 1
    return User(
166
        user_id,
167
        screen_name,
168
        suspended,
169
        deleted,
170
        avatar_url,
171
        is_orga,
172
    )
173
174
175 1
def find_user_by_email_address(email_address: str) -> Optional[DbUser]:
176
    """Return the user with that email address, or `None` if not found."""
177 1
    return DbUser.query \
178
        .filter(
179
            db.func.lower(DbUser.email_address) == email_address.lower()
180
        ) \
181
        .one_or_none()
182
183
184 1
def find_user_by_screen_name(
185
    screen_name: str, *, case_insensitive=False
186
) -> Optional[DbUser]:
187
    """Return the user with that screen name, or `None` if not found."""
188 1
    query = DbUser.query
189
190 1
    if case_insensitive:
191 1
        query = query.filter(
192
            db.func.lower(DbUser.screen_name) == screen_name.lower()
193
        )
194
    else:
195 1
        query = query.filter_by(screen_name=screen_name)
196
197 1
    return query.one_or_none()
198
199
200 1
def find_user_with_details(user_id: UserID) -> Optional[DbUser]:
201
    """Return the user and its details."""
202
    return DbUser.query \
203
        .options(db.joinedload('detail')) \
204
        .get(user_id)
205
206
207 1
def get_db_user(user_id: UserID) -> Optional[DbUser]:
208
    """Return the user with that ID, or raise an exception."""
209 1
    user = DbUser.query.get(user_id)
210
211 1
    if user is None:
212
        raise ValueError(f"Unknown user ID '{user_id}'")
213
214 1
    return user
215
216
217 1
def _db_entity_to_user(user: DbUser) -> User:
218 1
    avatar_url = None
219 1
    is_orga = False  # Information is not available here by design.
220
221 1
    return User(
222
        user.id,
223
        user.screen_name,
224
        user.suspended,
225
        user.deleted,
226
        avatar_url,
227
        is_orga,
228
    )
229
230
231 1
def _db_entity_to_user_detail(detail: DbUserDetail) -> UserDetail:
232 1
    return UserDetail(
233
        detail.first_names,
234
        detail.last_name,
235
        detail.date_of_birth,
236
        detail.country,
237
        detail.zip_code,
238
        detail.city,
239
        detail.street,
240
        detail.phone_number,
241
        detail.internal_comment,
242
        detail.extras,
243
    )
244
245
246 1
def _db_entity_to_user_with_detail(user: DbUser) -> User:
247 1
    user_dto = _db_entity_to_user(user)
248 1
    detail_dto = _db_entity_to_user_detail(user.detail)
249
250 1
    return UserWithDetail(
251
        user_dto.id,
252
        user_dto.screen_name,
253
        user_dto.suspended,
254
        user_dto.deleted,
255
        user_dto.avatar_url,
256
        user_dto.is_orga,
257
        detail_dto,
258
    )
259
260
261 1
def get_anonymous_user() -> AnonymousUser:
262
    """Return the anonymous user."""
263 1
    return AnonymousUser()
264
265
266 1
def get_email_address(user_id: UserID) -> str:
267
    """Return the user's e-mail address."""
268 1
    email_address = db.session \
269
        .query(DbUser.email_address) \
270
        .filter_by(id=user_id) \
271
        .scalar()
272
273 1
    if email_address is None:
274 1
        raise ValueError(
275
            f"Unknown user ID '{user_id}' or user has no email address"
276
        )
277
278 1
    return email_address
279
280
281 1
def get_sort_key_for_screen_name(user: User) -> Tuple[bool, str]:
282
    """Return a key for sorting by screen name.
283
284
    - Orders screen names case-insensitively.
285
    - Handles absent screen names (i.e. `None`) and places them at the
286
      end.
287
    """
288 1
    normalized_screen_name = (user.screen_name or '').lower()
289 1
    has_screen_name = bool(normalized_screen_name)
290 1
    return not has_screen_name, normalized_screen_name
291
292
293 1
def index_users_by_id(users: Set[User]) -> Dict[UserID, User]:
294
    """Map the users' IDs to the corresponding user objects."""
295 1
    return {user.id: user for user in users}
296
297
298 1
def is_screen_name_already_assigned(screen_name: str) -> bool:
299
    """Return `True` if a user with that screen name exists."""
300 1
    return _do_users_matching_filter_exist(DbUser.screen_name, screen_name)
301
302
303 1
def is_email_address_already_assigned(email_address: str) -> bool:
304
    """Return `True` if a user with that email address exists."""
305 1
    return _do_users_matching_filter_exist(DbUser.email_address, email_address)
306
307
308 1
def _do_users_matching_filter_exist(
309
    model_attribute: str, search_value: str
310
) -> bool:
311
    """Return `True` if any users match the filter.
312
313
    Comparison is done case-insensitively.
314
    """
315 1
    return db.session \
316
        .query(
317
            db.session \
318
                .query(DbUser) \
319
                .filter(db.func.lower(model_attribute) == search_value.lower()) \
320
                .exists()
321
        ) \
322
        .scalar()
323