1
|
|
|
""" |
2
|
|
|
byceps.services.user_avatar.service |
3
|
|
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
4
|
|
|
|
5
|
|
|
:Copyright: 2006-2021 Jochen Kupperschmidt |
6
|
|
|
:License: Revised BSD (see `LICENSE` file for details) |
7
|
|
|
""" |
8
|
|
|
|
9
|
1 |
|
from __future__ import annotations |
10
|
1 |
|
from typing import BinaryIO, Optional |
11
|
|
|
|
12
|
1 |
|
from sqlalchemy import select |
13
|
|
|
|
14
|
1 |
|
from ...database import db |
15
|
1 |
|
from ...typing import UserID |
16
|
1 |
|
from ...util.image import create_thumbnail |
17
|
1 |
|
from ...util.image.models import Dimensions, ImageType |
18
|
1 |
|
from ...util import upload |
19
|
|
|
|
20
|
1 |
|
from ..image import service as image_service |
21
|
1 |
|
from ..image.service import ImageTypeProhibited # Provide to view functions. |
22
|
1 |
|
from ..user.dbmodels.user import User as DbUser |
23
|
1 |
|
from ..user import service as user_service |
24
|
|
|
|
25
|
1 |
|
from .dbmodels import Avatar as DbAvatar, AvatarSelection as DbAvatarSelection |
26
|
1 |
|
from .transfer.models import AvatarID, AvatarUpdate |
27
|
|
|
|
28
|
|
|
|
29
|
1 |
|
MAXIMUM_DIMENSIONS = Dimensions(512, 512) |
30
|
|
|
|
31
|
|
|
|
32
|
1 |
View Code Duplication |
def update_avatar_image( |
33
|
|
|
user_id: UserID, |
34
|
|
|
stream: BinaryIO, |
35
|
|
|
allowed_types: set[ImageType], |
36
|
|
|
*, |
37
|
|
|
maximum_dimensions: Dimensions = MAXIMUM_DIMENSIONS, |
38
|
|
|
) -> AvatarID: |
39
|
|
|
"""Set a new avatar image for the user. |
40
|
|
|
|
41
|
|
|
Raise `ImageTypeProhibited` if the stream data is not of one the |
42
|
|
|
allowed types. |
43
|
|
|
""" |
44
|
1 |
|
user = user_service.get_db_user(user_id) |
45
|
|
|
|
46
|
1 |
|
image_type = image_service.determine_image_type(stream, allowed_types) |
47
|
1 |
|
image_dimensions = image_service.determine_dimensions(stream) |
48
|
|
|
|
49
|
1 |
|
image_too_large = image_dimensions > maximum_dimensions |
50
|
1 |
|
if image_too_large or not image_dimensions.is_square: |
51
|
1 |
|
stream = create_thumbnail( |
52
|
|
|
stream, image_type.name, maximum_dimensions, force_square=True |
53
|
|
|
) |
54
|
|
|
|
55
|
1 |
|
avatar = DbAvatar(user.id, image_type) |
56
|
1 |
|
db.session.add(avatar) |
57
|
1 |
|
db.session.commit() |
58
|
|
|
|
59
|
|
|
# Might raise `FileExistsError`. |
60
|
1 |
|
upload.store(stream, avatar.path, create_parent_path_if_nonexistent=True) |
61
|
|
|
|
62
|
1 |
|
user.avatar = avatar |
63
|
1 |
|
db.session.commit() |
64
|
|
|
|
65
|
1 |
|
return avatar.id |
66
|
|
|
|
67
|
|
|
|
68
|
1 |
|
def remove_avatar_image(user_id: UserID) -> None: |
69
|
|
|
"""Remove the user's avatar image. |
70
|
|
|
|
71
|
|
|
The avatar will be unlinked from the user, but the database record |
72
|
|
|
as well as the image file itself won't be removed, though. |
73
|
|
|
""" |
74
|
|
|
selection = db.session.query(DbAvatarSelection).get(user_id) |
75
|
|
|
|
76
|
|
|
if selection is None: |
77
|
|
|
raise ValueError(f'No avatar set for user ID {user_id}.') |
78
|
|
|
|
79
|
|
|
db.session.delete(selection) |
80
|
|
|
db.session.commit() |
81
|
|
|
|
82
|
|
|
|
83
|
1 |
|
def get_db_avatar(avatar_id: AvatarID) -> DbAvatar: |
84
|
|
|
"""Return the avatar with that ID, or raise exception if not found.""" |
85
|
1 |
|
return db.session.execute( |
86
|
|
|
select(DbAvatar) |
87
|
|
|
.filter_by(id=avatar_id) |
88
|
|
|
).scalar_one() |
89
|
|
|
|
90
|
|
|
|
91
|
1 |
|
def get_avatars_uploaded_by_user(user_id: UserID) -> list[AvatarUpdate]: |
92
|
|
|
"""Return the avatars uploaded by the user.""" |
93
|
1 |
|
avatars = db.session \ |
94
|
|
|
.query(DbAvatar) \ |
95
|
|
|
.filter_by(creator_id=user_id) \ |
96
|
|
|
.all() |
97
|
|
|
|
98
|
1 |
|
return [AvatarUpdate(avatar.created_at, avatar.url) for avatar in avatars] |
99
|
|
|
|
100
|
|
|
|
101
|
1 |
|
def get_avatar_url_for_user(user_id: UserID) -> Optional[str]: |
102
|
|
|
"""Return the URL of the user's current avatar, or `None` if not set.""" |
103
|
|
|
avatar_urls_by_user_id = get_avatar_urls_for_users({user_id}) |
104
|
|
|
return avatar_urls_by_user_id.get(user_id) |
105
|
|
|
|
106
|
|
|
|
107
|
1 |
|
def get_avatar_urls_for_users( |
108
|
|
|
user_ids: set[UserID], |
109
|
|
|
) -> dict[UserID, Optional[str]]: |
110
|
|
|
"""Return the URLs of those users' current avatars.""" |
111
|
1 |
|
if not user_ids: |
112
|
1 |
|
return {} |
113
|
|
|
|
114
|
|
|
user_ids_and_avatars = db.session.query( |
115
|
|
|
DbAvatarSelection.user_id, |
116
|
|
|
DbAvatar, |
117
|
|
|
) \ |
118
|
|
|
.join(DbAvatar) \ |
119
|
|
|
.filter(DbAvatarSelection.user_id.in_(user_ids)) \ |
120
|
|
|
.all() |
121
|
|
|
|
122
|
|
|
urls_by_user_id = { |
123
|
|
|
user_id: avatar.url for user_id, avatar in user_ids_and_avatars |
124
|
|
|
} |
125
|
|
|
|
126
|
|
|
# Include all user IDs in result. |
127
|
|
|
return {user_id: urls_by_user_id.get(user_id) for user_id in user_ids} |
128
|
|
|
|
129
|
|
|
|
130
|
1 |
|
def get_avatar_url_for_md5_email_address_hash(md5_hash: str) -> Optional[str]: |
131
|
|
|
"""Return the URL of the current avatar of the user with that hashed |
132
|
|
|
email address, or `None` if not set. |
133
|
|
|
""" |
134
|
1 |
|
avatar = db.session \ |
135
|
|
|
.query(DbAvatar) \ |
136
|
|
|
.join(DbAvatarSelection) \ |
137
|
|
|
.join(DbUser) \ |
138
|
|
|
.filter(db.func.md5(DbUser.email_address) == md5_hash) \ |
139
|
|
|
.one_or_none() |
140
|
|
|
|
141
|
1 |
|
if avatar is None: |
142
|
1 |
|
return None |
143
|
|
|
|
144
|
|
|
return avatar.url |
145
|
|
|
|