1
|
|
|
""" |
2
|
|
|
byceps.services.user_badge.awarding_service |
3
|
|
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
4
|
|
|
|
5
|
|
|
:Copyright: 2006-2021 Jochen Kupperschmidt |
6
|
|
|
:License: Revised BSD (see `LICENSE` file for details) |
7
|
|
|
""" |
8
|
|
|
|
9
|
1 |
|
from __future__ import annotations |
10
|
1 |
|
from collections import defaultdict |
11
|
1 |
|
from datetime import datetime |
12
|
1 |
|
from typing import Optional |
13
|
|
|
|
14
|
1 |
|
from ...database import db |
15
|
1 |
|
from ...events.user_badge import UserBadgeAwarded |
16
|
1 |
|
from ...typing import UserID |
17
|
|
|
|
18
|
1 |
|
from ..user import event_service, service as user_service |
19
|
|
|
|
20
|
1 |
|
from .badge_service import _db_entity_to_badge, get_badge, get_badges |
21
|
1 |
|
from .dbmodels.awarding import BadgeAwarding as DbBadgeAwarding |
22
|
1 |
|
from .dbmodels.badge import Badge as DbBadge |
23
|
1 |
|
from .transfer.models import ( |
24
|
|
|
Badge, |
25
|
|
|
BadgeAwarding, |
26
|
|
|
BadgeID, |
27
|
|
|
QuantifiedBadgeAwarding, |
28
|
|
|
) |
29
|
|
|
|
30
|
|
|
|
31
|
1 |
|
def award_badge_to_user( |
32
|
|
|
badge_id: BadgeID, user_id: UserID, *, initiator_id: Optional[UserID] = None |
33
|
|
|
) -> tuple[BadgeAwarding, UserBadgeAwarded]: |
34
|
|
|
"""Award the badge to the user.""" |
35
|
1 |
|
badge = get_badge(badge_id) |
36
|
1 |
|
user = user_service.get_user(user_id) |
37
|
1 |
|
awarded_at = datetime.utcnow() |
38
|
|
|
|
39
|
1 |
|
if initiator_id is not None: |
40
|
1 |
|
initiator = user_service.get_user(initiator_id) |
41
|
|
|
else: |
42
|
1 |
|
initiator = None |
43
|
|
|
|
44
|
1 |
|
awarding = DbBadgeAwarding(badge_id, user_id, awarded_at=awarded_at) |
45
|
1 |
|
db.session.add(awarding) |
46
|
|
|
|
47
|
1 |
|
event_data = {'badge_id': str(badge_id)} |
48
|
1 |
|
if initiator_id: |
49
|
1 |
|
event_data['initiator_id'] = str(initiator_id) |
50
|
1 |
|
event = event_service.build_event( |
51
|
|
|
'user-badge-awarded', user_id, event_data, occurred_at=awarded_at |
52
|
|
|
) |
53
|
1 |
|
db.session.add(event) |
54
|
|
|
|
55
|
1 |
|
db.session.commit() |
56
|
|
|
|
57
|
1 |
|
awarding_dto = _db_entity_to_badge_awarding(awarding) |
58
|
|
|
|
59
|
1 |
|
event = UserBadgeAwarded( |
60
|
|
|
occurred_at=awarded_at, |
61
|
|
|
initiator_id=initiator.id if initiator else None, |
62
|
|
|
initiator_screen_name=initiator.screen_name if initiator else None, |
63
|
|
|
user_id=user_id, |
64
|
|
|
user_screen_name=user.screen_name, |
65
|
|
|
badge_id=badge_id, |
66
|
|
|
badge_label=badge.label, |
67
|
|
|
) |
68
|
|
|
|
69
|
1 |
|
return awarding_dto, event |
70
|
|
|
|
71
|
|
|
|
72
|
1 |
|
def count_awardings() -> dict[BadgeID, int]: |
73
|
|
|
"""Return the number of times each badge has been awarded. |
74
|
|
|
|
75
|
|
|
Because a badge can be awarded multiple times to a user, the number |
76
|
|
|
of awardings does not represent the number of awardees. |
77
|
|
|
""" |
78
|
1 |
|
rows = db.session \ |
79
|
|
|
.query( |
80
|
|
|
DbBadge.id, |
81
|
|
|
db.func.count(DbBadgeAwarding.id) |
82
|
|
|
) \ |
83
|
|
|
.outerjoin(DbBadgeAwarding) \ |
84
|
|
|
.group_by(DbBadge.id) \ |
85
|
|
|
.all() |
86
|
|
|
|
87
|
1 |
|
return {badge_id: count for badge_id, count in rows} |
88
|
|
|
|
89
|
|
|
|
90
|
1 |
|
def get_awardings_of_badge(badge_id: BadgeID) -> set[QuantifiedBadgeAwarding]: |
91
|
|
|
"""Return the awardings of this badge.""" |
92
|
1 |
|
rows = db.session \ |
93
|
|
|
.query( |
94
|
|
|
DbBadgeAwarding.badge_id, |
95
|
|
|
DbBadgeAwarding.user_id, |
96
|
|
|
db.func.count(DbBadgeAwarding.badge_id) |
97
|
|
|
) \ |
98
|
|
|
.filter(DbBadgeAwarding.badge_id == badge_id) \ |
99
|
|
|
.group_by( |
100
|
|
|
DbBadgeAwarding.badge_id, |
101
|
|
|
DbBadgeAwarding.user_id |
102
|
|
|
) \ |
103
|
|
|
.all() |
104
|
|
|
|
105
|
1 |
|
return { |
106
|
|
|
QuantifiedBadgeAwarding(badge_id, user_id, quantity) |
107
|
|
|
for badge_id, user_id, quantity in rows |
108
|
|
|
} |
109
|
|
|
|
110
|
|
|
|
111
|
1 |
|
def get_badges_awarded_to_user(user_id: UserID) -> dict[Badge, int]: |
112
|
|
|
"""Return all badges that have been awarded to the user (and how often).""" |
113
|
1 |
|
rows = db.session \ |
114
|
|
|
.query( |
115
|
|
|
DbBadgeAwarding.badge_id, |
116
|
|
|
db.func.count(DbBadgeAwarding.badge_id) |
117
|
|
|
) \ |
118
|
|
|
.filter(DbBadgeAwarding.user_id == user_id) \ |
119
|
|
|
.group_by( |
120
|
|
|
DbBadgeAwarding.badge_id, |
121
|
|
|
) \ |
122
|
|
|
.all() |
123
|
|
|
|
124
|
1 |
|
badge_ids_with_awarding_quantity = {row[0]: row[1] for row in rows} |
125
|
|
|
|
126
|
1 |
|
badge_ids = set(badge_ids_with_awarding_quantity.keys()) |
127
|
|
|
|
128
|
1 |
|
if badge_ids: |
129
|
1 |
|
badges = DbBadge.query \ |
130
|
|
|
.filter(DbBadge.id.in_(badge_ids)) \ |
131
|
|
|
.all() |
132
|
|
|
else: |
133
|
|
|
badges = [] |
134
|
|
|
|
135
|
1 |
|
badges_with_awarding_quantity = {} |
136
|
1 |
|
for badge in badges: |
137
|
1 |
|
quantity = badge_ids_with_awarding_quantity[badge.id] |
138
|
1 |
|
badges_with_awarding_quantity[_db_entity_to_badge(badge)] = quantity |
139
|
|
|
|
140
|
1 |
|
return badges_with_awarding_quantity |
141
|
|
|
|
142
|
|
|
|
143
|
1 |
|
def get_badges_awarded_to_users( |
144
|
|
|
user_ids: set[UserID], *, featured_only: bool = False |
145
|
|
|
) -> dict[UserID, set[Badge]]: |
146
|
|
|
"""Return all badges that have been awarded to the users, indexed |
147
|
|
|
by user ID. |
148
|
|
|
|
149
|
|
|
If `featured_only` is `True`, only return featured badges. |
150
|
|
|
""" |
151
|
1 |
|
if not user_ids: |
152
|
|
|
return {} |
153
|
|
|
|
154
|
1 |
|
awardings = DbBadgeAwarding.query \ |
155
|
|
|
.filter(DbBadgeAwarding.user_id.in_(user_ids)) \ |
156
|
|
|
.all() |
157
|
|
|
|
158
|
1 |
|
badge_ids = {awarding.badge_id for awarding in awardings} |
159
|
1 |
|
badges = get_badges(badge_ids, featured_only=featured_only) |
160
|
1 |
|
badges_by_id = {badge.id: badge for badge in badges} |
161
|
|
|
|
162
|
1 |
|
badges_by_user_id: dict[UserID, set[Badge]] = defaultdict(set) |
163
|
1 |
|
for awarding in awardings: |
164
|
|
|
badge = badges_by_id.get(awarding.badge_id) |
165
|
|
|
if badge: |
166
|
|
|
badges_by_user_id[awarding.user_id].add(badge) |
167
|
|
|
|
168
|
1 |
|
return dict(badges_by_user_id) |
169
|
|
|
|
170
|
|
|
|
171
|
1 |
|
def _db_entity_to_badge_awarding(entity: DbBadgeAwarding) -> BadgeAwarding: |
172
|
1 |
|
return BadgeAwarding( |
173
|
|
|
entity.id, |
174
|
|
|
entity.badge_id, |
175
|
|
|
entity.user_id, |
176
|
|
|
entity.awarded_at |
177
|
|
|
) |
178
|
|
|
|