Passed
Branch main (854eb5)
by Jochen
04:24
created

byceps.services.user_badge.awarding_service   A

Complexity

Total Complexity 15

Size/Duplication

Total Lines 177
Duplicated Lines 0 %

Test Coverage

Coverage 92.06%

Importance

Changes 0
Metric Value
eloc 111
dl 0
loc 177
ccs 58
cts 63
cp 0.9206
rs 10
c 0
b 0
f 0
wmc 15

6 Functions

Rating   Name   Duplication   Size   Complexity  
A _db_entity_to_badge_awarding() 0 6 1
A get_awardings_of_badge() 0 18 1
A get_badges_awarded_to_user() 0 30 3
B award_badge_to_user() 0 39 5
A count_awardings() 0 16 1
A get_badges_awarded_to_users() 0 26 4
1
"""
2
byceps.services.user_badge.awarding_service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2021 Jochen Kupperschmidt
6
:License: Revised BSD (see `LICENSE` file for details)
7
"""
8
9 1
from __future__ import annotations
10 1
from collections import defaultdict
11 1
from datetime import datetime
12 1
from typing import Optional
13
14 1
from ...database import db
15 1
from ...events.user_badge import UserBadgeAwarded
16 1
from ...typing import UserID
17
18 1
from ..user import event_service, service as user_service
19
20 1
from .badge_service import _db_entity_to_badge, get_badge, get_badges
21 1
from .dbmodels.awarding import BadgeAwarding as DbBadgeAwarding
22 1
from .dbmodels.badge import Badge as DbBadge
23 1
from .transfer.models import (
24
    Badge,
25
    BadgeAwarding,
26
    BadgeID,
27
    QuantifiedBadgeAwarding,
28
)
29
30
31 1
def award_badge_to_user(
32
    badge_id: BadgeID, user_id: UserID, *, initiator_id: Optional[UserID] = None
33
) -> tuple[BadgeAwarding, UserBadgeAwarded]:
34
    """Award the badge to the user."""
35 1
    badge = get_badge(badge_id)
36 1
    user = user_service.get_user(user_id)
37 1
    awarded_at = datetime.utcnow()
38
39 1
    if initiator_id is not None:
40 1
        initiator = user_service.get_user(initiator_id)
41
    else:
42 1
        initiator = None
43
44 1
    awarding = DbBadgeAwarding(badge_id, user_id, awarded_at=awarded_at)
45 1
    db.session.add(awarding)
46
47 1
    event_data = {'badge_id': str(badge_id)}
48 1
    if initiator_id:
49 1
        event_data['initiator_id'] = str(initiator_id)
50 1
    event = event_service.build_event(
51
        'user-badge-awarded', user_id, event_data, occurred_at=awarded_at
52
    )
53 1
    db.session.add(event)
54
55 1
    db.session.commit()
56
57 1
    awarding_dto = _db_entity_to_badge_awarding(awarding)
58
59 1
    event = UserBadgeAwarded(
60
        occurred_at=awarded_at,
61
        initiator_id=initiator.id if initiator else None,
62
        initiator_screen_name=initiator.screen_name if initiator else None,
63
        user_id=user_id,
64
        user_screen_name=user.screen_name,
65
        badge_id=badge_id,
66
        badge_label=badge.label,
67
    )
68
69 1
    return awarding_dto, event
70
71
72 1
def count_awardings() -> dict[BadgeID, int]:
73
    """Return the number of times each badge has been awarded.
74
75
    Because a badge can be awarded multiple times to a user, the number
76
    of awardings does not represent the number of awardees.
77
    """
78 1
    rows = db.session \
79
        .query(
80
            DbBadge.id,
81
            db.func.count(DbBadgeAwarding.id)
82
        ) \
83
        .outerjoin(DbBadgeAwarding) \
84
        .group_by(DbBadge.id) \
85
        .all()
86
87 1
    return {badge_id: count for badge_id, count in rows}
88
89
90 1
def get_awardings_of_badge(badge_id: BadgeID) -> set[QuantifiedBadgeAwarding]:
91
    """Return the awardings of this badge."""
92 1
    rows = db.session \
93
        .query(
94
            DbBadgeAwarding.badge_id,
95
            DbBadgeAwarding.user_id,
96
            db.func.count(DbBadgeAwarding.badge_id)
97
        ) \
98
        .filter(DbBadgeAwarding.badge_id == badge_id) \
99
        .group_by(
100
            DbBadgeAwarding.badge_id,
101
            DbBadgeAwarding.user_id
102
        ) \
103
        .all()
104
105 1
    return {
106
        QuantifiedBadgeAwarding(badge_id, user_id, quantity)
107
        for badge_id, user_id, quantity in rows
108
    }
109
110
111 1
def get_badges_awarded_to_user(user_id: UserID) -> dict[Badge, int]:
112
    """Return all badges that have been awarded to the user (and how often)."""
113 1
    rows = db.session \
114
        .query(
115
            DbBadgeAwarding.badge_id,
116
            db.func.count(DbBadgeAwarding.badge_id)
117
        ) \
118
        .filter(DbBadgeAwarding.user_id == user_id) \
119
        .group_by(
120
            DbBadgeAwarding.badge_id,
121
        ) \
122
        .all()
123
124 1
    badge_ids_with_awarding_quantity = {row[0]: row[1] for row in rows}
125
126 1
    badge_ids = set(badge_ids_with_awarding_quantity.keys())
127
128 1
    if badge_ids:
129 1
        badges = DbBadge.query \
130
            .filter(DbBadge.id.in_(badge_ids)) \
131
            .all()
132
    else:
133
        badges = []
134
135 1
    badges_with_awarding_quantity = {}
136 1
    for badge in badges:
137 1
        quantity = badge_ids_with_awarding_quantity[badge.id]
138 1
        badges_with_awarding_quantity[_db_entity_to_badge(badge)] = quantity
139
140 1
    return badges_with_awarding_quantity
141
142
143 1
def get_badges_awarded_to_users(
144
    user_ids: set[UserID], *, featured_only: bool = False
145
) -> dict[UserID, set[Badge]]:
146
    """Return all badges that have been awarded to the users, indexed
147
    by user ID.
148
149
    If `featured_only` is `True`, only return featured badges.
150
    """
151 1
    if not user_ids:
152
        return {}
153
154 1
    awardings = DbBadgeAwarding.query \
155
        .filter(DbBadgeAwarding.user_id.in_(user_ids)) \
156
        .all()
157
158 1
    badge_ids = {awarding.badge_id for awarding in awardings}
159 1
    badges = get_badges(badge_ids, featured_only=featured_only)
160 1
    badges_by_id = {badge.id: badge for badge in badges}
161
162 1
    badges_by_user_id: dict[UserID, set[Badge]] = defaultdict(set)
163 1
    for awarding in awardings:
164
        badge = badges_by_id.get(awarding.badge_id)
165
        if badge:
166
            badges_by_user_id[awarding.user_id].add(badge)
167
168 1
    return dict(badges_by_user_id)
169
170
171 1
def _db_entity_to_badge_awarding(entity: DbBadgeAwarding) -> BadgeAwarding:
172 1
    return BadgeAwarding(
173
        entity.id,
174
        entity.badge_id,
175
        entity.user_id,
176
        entity.awarded_at
177
    )
178