Passed
Push — main ( 1f94b4...a29a2e )
by Jochen
04:44
created

byceps.services.user_avatar.service   A

Complexity

Total Complexity 12

Size/Duplication

Total Lines 145
Duplicated Lines 23.45 %

Test Coverage

Coverage 81.48%

Importance

Changes 0
Metric Value
eloc 80
dl 34
loc 145
ccs 44
cts 54
cp 0.8148
rs 10
c 0
b 0
f 0
wmc 12

7 Functions

Rating   Name   Duplication   Size   Complexity  
A remove_avatar_image() 0 13 2
A update_avatar_image() 34 34 3
A get_avatars_uploaded_by_user() 0 8 1
A get_avatar_urls_for_users() 0 21 2
A get_avatar_url_for_user() 0 4 1
A get_avatar_url_for_md5_email_address_hash() 0 15 2
A get_db_avatar() 0 6 1

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
"""
2
byceps.services.user_avatar.service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2021 Jochen Kupperschmidt
6
:License: Revised BSD (see `LICENSE` file for details)
7
"""
8
9 1
from __future__ import annotations
10 1
from typing import BinaryIO, Optional
11
12 1
from sqlalchemy import select
13
14 1
from ...database import db
15 1
from ...typing import UserID
16 1
from ...util.image import create_thumbnail
17 1
from ...util.image.models import Dimensions, ImageType
18 1
from ...util import upload
19
20 1
from ..image import service as image_service
21 1
from ..image.service import ImageTypeProhibited  # Provide to view functions.
22 1
from ..user.dbmodels.user import User as DbUser
23 1
from ..user import service as user_service
24
25 1
from .dbmodels import Avatar as DbAvatar, AvatarSelection as DbAvatarSelection
26 1
from .transfer.models import AvatarID, AvatarUpdate
27
28
29 1
MAXIMUM_DIMENSIONS = Dimensions(512, 512)
30
31
32 1 View Code Duplication
def update_avatar_image(
33
    user_id: UserID,
34
    stream: BinaryIO,
35
    allowed_types: set[ImageType],
36
    *,
37
    maximum_dimensions: Dimensions = MAXIMUM_DIMENSIONS,
38
) -> AvatarID:
39
    """Set a new avatar image for the user.
40
41
    Raise `ImageTypeProhibited` if the stream data is not of one the
42
    allowed types.
43
    """
44 1
    user = user_service.get_db_user(user_id)
45
46 1
    image_type = image_service.determine_image_type(stream, allowed_types)
47 1
    image_dimensions = image_service.determine_dimensions(stream)
48
49 1
    image_too_large = image_dimensions > maximum_dimensions
50 1
    if image_too_large or not image_dimensions.is_square:
51 1
        stream = create_thumbnail(
52
            stream, image_type.name, maximum_dimensions, force_square=True
53
        )
54
55 1
    avatar = DbAvatar(user.id, image_type)
56 1
    db.session.add(avatar)
57 1
    db.session.commit()
58
59
    # Might raise `FileExistsError`.
60 1
    upload.store(stream, avatar.path, create_parent_path_if_nonexistent=True)
61
62 1
    user.avatar = avatar
63 1
    db.session.commit()
64
65 1
    return avatar.id
66
67
68 1
def remove_avatar_image(user_id: UserID) -> None:
69
    """Remove the user's avatar image.
70
71
    The avatar will be unlinked from the user, but the database record
72
    as well as the image file itself won't be removed, though.
73
    """
74
    selection = db.session.query(DbAvatarSelection).get(user_id)
75
76
    if selection is None:
77
        raise ValueError(f'No avatar set for user ID {user_id}.')
78
79
    db.session.delete(selection)
80
    db.session.commit()
81
82
83 1
def get_db_avatar(avatar_id: AvatarID) -> DbAvatar:
84
    """Return the avatar with that ID, or raise exception if not found."""
85 1
    return db.session.execute(
86
        select(DbAvatar)
87
        .filter_by(id=avatar_id)
88
    ).scalar_one()
89
90
91 1
def get_avatars_uploaded_by_user(user_id: UserID) -> list[AvatarUpdate]:
92
    """Return the avatars uploaded by the user."""
93 1
    avatars = db.session \
94
        .query(DbAvatar) \
95
        .filter_by(creator_id=user_id) \
96
        .all()
97
98 1
    return [AvatarUpdate(avatar.created_at, avatar.url) for avatar in avatars]
99
100
101 1
def get_avatar_url_for_user(user_id: UserID) -> Optional[str]:
102
    """Return the URL of the user's current avatar, or `None` if not set."""
103
    avatar_urls_by_user_id = get_avatar_urls_for_users({user_id})
104
    return avatar_urls_by_user_id.get(user_id)
105
106
107 1
def get_avatar_urls_for_users(
108
    user_ids: set[UserID],
109
) -> dict[UserID, Optional[str]]:
110
    """Return the URLs of those users' current avatars."""
111 1
    if not user_ids:
112 1
        return {}
113
114
    user_ids_and_avatars = db.session.query(
115
            DbAvatarSelection.user_id,
116
            DbAvatar,
117
        ) \
118
        .join(DbAvatar) \
119
        .filter(DbAvatarSelection.user_id.in_(user_ids)) \
120
        .all()
121
122
    urls_by_user_id = {
123
        user_id: avatar.url for user_id, avatar in user_ids_and_avatars
124
    }
125
126
    # Include all user IDs in result.
127
    return {user_id: urls_by_user_id.get(user_id) for user_id in user_ids}
128
129
130 1
def get_avatar_url_for_md5_email_address_hash(md5_hash: str) -> Optional[str]:
131
    """Return the URL of the current avatar of the user with that hashed
132
    email address, or `None` if not set.
133
    """
134 1
    avatar = db.session \
135
        .query(DbAvatar) \
136
        .join(DbAvatarSelection) \
137
        .join(DbUser) \
138
        .filter(db.func.md5(DbUser.email_address) == md5_hash) \
139
        .one_or_none()
140
141 1
    if avatar is None:
142 1
        return None
143
144
    return avatar.url
145