Completed
Branch master (4d0d43)
by Jochen
03:26 queued 01:02
created

byceps.services.user.service.find_user_by_email_address()   A

Complexity

Conditions 1

Size

Total Lines 7
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 5
nop 1
dl 0
loc 7
rs 10
c 0
b 0
f 0
1
"""
2
byceps.services.user.service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2020 Jochen Kupperschmidt
6
:License: Modified BSD, see LICENSE for details.
7
"""
8
9
from typing import Dict, Optional, Set, Tuple
10
11
from ...database import db, Query
12
from ...typing import PartyID, UserID
13
14
from ..orga_team.models import OrgaTeam, Membership as OrgaTeamMembership
15
from ..user_avatar.models import Avatar, AvatarSelection
16
17
from .models.user import AnonymousUser, User as DbUser
18
from .transfer.models import User
19
20
21
def find_active_db_user(user_id: UserID) -> Optional[DbUser]:
22
    """Return the user with that ID if the account is "active", or
23
    `None` if:
24
    - the ID is unknown.
25
    - the account has not been activated, yet.
26
    - the account is currently suspended.
27
    - the account is marked as deleted.
28
    """
29
    return DbUser.query \
30
        .filter_by(initialized=True) \
31
        .filter_by(suspended=False) \
32
        .filter_by(deleted=False) \
33
        .filter_by(id=user_id) \
34
        .one_or_none()
35
36
37
def find_active_user(
38
    user_id: UserID,
39
    *,
40
    include_avatar: bool = False,
41
    include_orga_flag_for_party_id: Optional[PartyID] = None,
42
) -> Optional[User]:
43
    """Return the user with that ID if the account is "active", or
44
    `None` if:
45
    - the ID is unknown.
46
    - the account has not been activated, yet.
47
    - the account is currently suspended.
48
    - the account is marked as deleted.
49
    """
50
    query = _get_user_query(include_avatar, include_orga_flag_for_party_id)
51
52
    row = query \
53
        .filter(DbUser.initialized == True) \
54
        .filter(DbUser.suspended == False) \
55
        .filter(DbUser.deleted == False) \
56
        .filter(DbUser.id == user_id) \
57
        .one_or_none()
58
59
    if row is None:
60
        return None
61
62
    return _user_row_to_dto(row)
63
64
65
def find_user(
66
    user_id: UserID,
67
    *,
68
    include_avatar: bool = False,
69
    include_orga_flag_for_party_id: Optional[PartyID] = None,
70
) -> Optional[User]:
71
    """Return the user with that ID, or `None` if not found.
72
73
    Include avatar URLs if requested.
74
    """
75
    row = _get_user_query(include_avatar, include_orga_flag_for_party_id) \
76
        .filter(DbUser.id == user_id) \
77
        .one_or_none()
78
79
    if row is None:
80
        return None
81
82
    return _user_row_to_dto(row)
83
84
85
def find_users(
86
    user_ids: Set[UserID],
87
    *,
88
    include_avatars: bool = False,
89
    include_orga_flags_for_party_id: Optional[PartyID] = None,
90
) -> Set[User]:
91
    """Return the users with those IDs.
92
93
    Their respective avatars' URLs are included, if requested.
94
    """
95
    if not user_ids:
96
        return set()
97
98
    query = _get_user_query(include_avatars, include_orga_flags_for_party_id)
99
100
    rows = query \
101
        .filter(DbUser.id.in_(frozenset(user_ids))) \
102
        .all()
103
104
    return {_user_row_to_dto(row) for row in rows}
105
106
107
def _get_user_query(
108
    include_avatar: bool,
109
    include_orga_flags_for_party_id: Optional[PartyID] = None,
110
) -> Query:
111
    orga_flag_expression = db.false()
112
    if include_orga_flags_for_party_id is not None:
113
        orga_flag_expression = _get_orga_flag_subquery(
114
            include_orga_flags_for_party_id)
115
116
    query = db.session \
117
        .query(
118
            DbUser.id,
119
            DbUser.screen_name,
120
            DbUser.suspended,
121
            DbUser.deleted,
122
            Avatar if include_avatar else db.null(),
123
            orga_flag_expression,
124
        )
125
126
    if include_avatar:
127
        query = query \
128
            .outerjoin(AvatarSelection, DbUser.avatar_selection) \
129
            .outerjoin(Avatar)
130
131
    return query
132
133
134
def _get_orga_flag_subquery(party_id: PartyID):
135
    return db.session \
136
        .query(
137
            db.func.count(OrgaTeamMembership.id)
138
        ) \
139
        .join(OrgaTeam) \
140
        .filter(OrgaTeam.party_id == party_id) \
141
        .filter(OrgaTeamMembership.user_id == DbUser.id) \
142
        .exists()
143
144
145
def _user_row_to_dto(
146
    row: Tuple[UserID, str, bool, bool, Optional[Avatar], bool]
147
) -> User:
148
    user_id, screen_name, suspended, deleted, avatar, is_orga = row
149
    avatar_url = avatar.url if avatar else None
150
151
    return User(
152
        user_id,
153
        screen_name,
154
        suspended,
155
        deleted,
156
        avatar_url,
157
        is_orga,
158
    )
159
160
161
def find_user_by_email_address(email_address: str) -> Optional[DbUser]:
162
    """Return the user with that email address, or `None` if not found."""
163
    return DbUser.query \
164
        .filter(
165
            db.func.lower(DbUser.email_address) == email_address.lower()
166
        ) \
167
        .one_or_none()
168
169
170
def find_user_by_screen_name(
171
    screen_name: str, *, case_insensitive=False
172
) -> Optional[DbUser]:
173
    """Return the user with that screen name, or `None` if not found."""
174
    query = DbUser.query
175
176
    if case_insensitive:
177
        query = query.filter(
178
            db.func.lower(DbUser.screen_name) == screen_name.lower()
179
        )
180
    else:
181
        query = query.filter_by(screen_name=screen_name)
182
183
    return query.one_or_none()
184
185
186
def find_user_with_details(user_id: UserID) -> Optional[DbUser]:
187
    """Return the user and its details."""
188
    return DbUser.query \
189
        .options(db.joinedload('detail')) \
190
        .get(user_id)
191
192
193
def _db_entity_to_user_dto(user: DbUser) -> User:
194
    avatar_url = None
195
    is_orga = False  # Information is not available here by design.
196
197
    return User(
198
        user.id,
199
        user.screen_name,
200
        user.suspended,
201
        user.deleted,
202
        avatar_url,
203
        is_orga,
204
    )
205
206
207
def get_anonymous_user() -> AnonymousUser:
208
    """Return the anonymous user."""
209
    return AnonymousUser()
210
211
212
def get_email_address(user_id: UserID) -> str:
213
    """Return the user's e-mail address."""
214
    email_address = db.session \
215
        .query(DbUser.email_address) \
216
        .filter_by(id=user_id) \
217
        .scalar()
218
219
    if email_address is None:
220
        raise ValueError(f"Unknown user ID '{user_id}'")
221
222
    return email_address
223
224
225
def index_users_by_id(users: Set[User]) -> Dict[UserID, User]:
226
    """Map the users' IDs to the corresponding user objects."""
227
    return {user.id: user for user in users}
228
229
230
def is_screen_name_already_assigned(screen_name: str) -> bool:
231
    """Return `True` if a user with that screen name exists."""
232
    return _do_users_matching_filter_exist(DbUser.screen_name, screen_name)
233
234
235
def is_email_address_already_assigned(email_address: str) -> bool:
236
    """Return `True` if a user with that email address exists."""
237
    return _do_users_matching_filter_exist(DbUser.email_address, email_address)
238
239
240
def _do_users_matching_filter_exist(
241
    model_attribute: str, search_value: str
242
) -> bool:
243
    """Return `True` if any users match the filter.
244
245
    Comparison is done case-insensitively.
246
    """
247
    return db.session \
248
        .query(
249
            db.session \
250
                .query(DbUser) \
251
                .filter(db.func.lower(model_attribute) == search_value.lower()) \
252
                .exists()
253
        ) \
254
        .scalar()
255