1
|
|
|
""" |
2
|
|
|
byceps.services.user_badge.awarding_service |
3
|
|
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
4
|
|
|
|
5
|
|
|
:Copyright: 2006-2020 Jochen Kupperschmidt |
6
|
|
|
:License: Modified BSD, see LICENSE for details. |
7
|
|
|
""" |
8
|
|
|
|
9
|
1 |
|
from collections import defaultdict |
10
|
1 |
|
from datetime import datetime |
11
|
1 |
|
from typing import Dict, Optional, Set, Tuple |
12
|
|
|
|
13
|
1 |
|
from ...database import db |
14
|
1 |
|
from ...events.user_badge import UserBadgeAwarded |
15
|
1 |
|
from ...typing import UserID |
16
|
|
|
|
17
|
1 |
|
from ..user import event_service |
18
|
|
|
|
19
|
1 |
|
from .badge_service import _db_entity_to_badge |
20
|
1 |
|
from .models.awarding import BadgeAwarding as DbBadgeAwarding |
21
|
1 |
|
from .models.badge import Badge as DbBadge |
22
|
1 |
|
from .transfer.models import ( |
23
|
|
|
Badge, |
24
|
|
|
BadgeAwarding, |
25
|
|
|
BadgeID, |
26
|
|
|
QuantifiedBadgeAwarding, |
27
|
|
|
) |
28
|
|
|
|
29
|
|
|
|
30
|
1 |
|
def award_badge_to_user( |
31
|
|
|
badge_id: BadgeID, user_id: UserID, *, initiator_id: Optional[UserID] = None |
32
|
|
|
) -> Tuple[BadgeAwarding, UserBadgeAwarded]: |
33
|
|
|
"""Award the badge to the user.""" |
34
|
1 |
|
awarded_at = datetime.utcnow() |
35
|
|
|
|
36
|
1 |
|
awarding = DbBadgeAwarding(badge_id, user_id, awarded_at=awarded_at) |
37
|
1 |
|
db.session.add(awarding) |
38
|
|
|
|
39
|
1 |
|
event_data = {'badge_id': str(badge_id)} |
40
|
1 |
|
if initiator_id: |
41
|
1 |
|
event_data['initiator_id'] = str(initiator_id) |
42
|
1 |
|
event = event_service.build_event( |
43
|
|
|
'user-badge-awarded', user_id, event_data, occurred_at=awarded_at |
44
|
|
|
) |
45
|
1 |
|
db.session.add(event) |
46
|
|
|
|
47
|
1 |
|
db.session.commit() |
48
|
|
|
|
49
|
1 |
|
awarding_dto = _db_entity_to_badge_awarding(awarding) |
50
|
|
|
|
51
|
1 |
|
event = UserBadgeAwarded( |
52
|
|
|
occurred_at=awarded_at, |
53
|
|
|
user_id=user_id, |
54
|
|
|
badge_id=badge_id, |
55
|
|
|
initiator_id=initiator_id, |
56
|
|
|
) |
57
|
|
|
|
58
|
1 |
|
return awarding_dto, event |
59
|
|
|
|
60
|
|
|
|
61
|
1 |
|
def count_awardings() -> Dict[BadgeID, int]: |
62
|
|
|
"""Return the number of times each badge has been awarded. |
63
|
|
|
|
64
|
|
|
Because a badge can be awarded multiple times to a user, the number |
65
|
|
|
of awardings does not represent the number of awardees. |
66
|
|
|
""" |
67
|
1 |
|
rows = db.session \ |
68
|
|
|
.query( |
69
|
|
|
DbBadge.id, |
70
|
|
|
db.func.count(DbBadgeAwarding.id) |
71
|
|
|
) \ |
72
|
|
|
.outerjoin(DbBadgeAwarding) \ |
73
|
|
|
.group_by(DbBadge.id) \ |
74
|
|
|
.all() |
75
|
|
|
|
76
|
1 |
|
return {badge_id: count for badge_id, count in rows} |
77
|
|
|
|
78
|
|
|
|
79
|
1 |
|
def get_awardings_of_badge(badge_id: BadgeID) -> Set[QuantifiedBadgeAwarding]: |
80
|
|
|
"""Return the awardings of this badge.""" |
81
|
1 |
|
rows = db.session \ |
82
|
|
|
.query( |
83
|
|
|
DbBadgeAwarding.badge_id, |
84
|
|
|
DbBadgeAwarding.user_id, |
85
|
|
|
db.func.count(DbBadgeAwarding.badge_id) |
86
|
|
|
) \ |
87
|
|
|
.filter(DbBadgeAwarding.badge_id == badge_id) \ |
88
|
|
|
.group_by( |
89
|
|
|
DbBadgeAwarding.badge_id, |
90
|
|
|
DbBadgeAwarding.user_id |
91
|
|
|
) \ |
92
|
|
|
.all() |
93
|
|
|
|
94
|
1 |
|
return { |
95
|
|
|
QuantifiedBadgeAwarding(badge_id, user_id, quantity) |
96
|
|
|
for badge_id, user_id, quantity in rows |
97
|
|
|
} |
98
|
|
|
|
99
|
|
|
|
100
|
1 |
|
def get_badges_awarded_to_user(user_id: UserID) -> Dict[Badge, int]: |
101
|
|
|
"""Return all badges that have been awarded to the user (and how often).""" |
102
|
1 |
|
rows = db.session \ |
103
|
|
|
.query( |
104
|
|
|
DbBadgeAwarding.badge_id, |
105
|
|
|
db.func.count(DbBadgeAwarding.badge_id) |
106
|
|
|
) \ |
107
|
|
|
.filter(DbBadgeAwarding.user_id == user_id) \ |
108
|
|
|
.group_by( |
109
|
|
|
DbBadgeAwarding.badge_id, |
110
|
|
|
) \ |
111
|
|
|
.all() |
112
|
|
|
|
113
|
1 |
|
badge_ids_with_awarding_quantity = {row[0]: row[1] for row in rows} |
114
|
|
|
|
115
|
1 |
|
badge_ids = set(badge_ids_with_awarding_quantity.keys()) |
116
|
|
|
|
117
|
1 |
|
if badge_ids: |
118
|
1 |
|
badges = DbBadge.query \ |
119
|
|
|
.filter(DbBadge.id.in_(badge_ids)) \ |
120
|
|
|
.all() |
121
|
|
|
else: |
122
|
|
|
badges = [] |
123
|
|
|
|
124
|
1 |
|
badges_with_awarding_quantity = {} |
125
|
1 |
|
for badge in badges: |
126
|
1 |
|
quantity = badge_ids_with_awarding_quantity[badge.id] |
127
|
1 |
|
badges_with_awarding_quantity[_db_entity_to_badge(badge)] = quantity |
128
|
|
|
|
129
|
1 |
|
return badges_with_awarding_quantity |
130
|
|
|
|
131
|
|
|
|
132
|
1 |
|
def get_badges_awarded_to_users( |
133
|
|
|
user_ids: Set[UserID], *, featured_only: bool = False |
134
|
|
|
) -> Dict[UserID, Set[Badge]]: |
135
|
|
|
"""Return all badges that have been awarded to the users, indexed |
136
|
|
|
by user ID. |
137
|
|
|
|
138
|
|
|
If `featured_only` is `True`, only return featured badges. |
139
|
|
|
""" |
140
|
|
|
if not user_ids: |
141
|
|
|
return {} |
142
|
|
|
|
143
|
|
|
awardings = DbBadgeAwarding.query \ |
144
|
|
|
.filter(DbBadgeAwarding.user_id.in_(user_ids)) \ |
145
|
|
|
.all() |
146
|
|
|
|
147
|
|
|
badge_ids = {awarding.badge_id for awarding in awardings} |
148
|
|
|
badges = get_badges(badge_ids, featured_only=featured_only) |
|
|
|
|
149
|
|
|
badges_by_id = {badge.id: badge for badge in badges} |
150
|
|
|
|
151
|
|
|
badges_by_user_id: Dict[UserID, Set[Badge]] = defaultdict(set) |
152
|
|
|
for awarding in awardings: |
153
|
|
|
badge = badges_by_id.get(awarding.badge_id) |
154
|
|
|
if badge: |
155
|
|
|
badges_by_user_id[awarding.user_id].add(badge) |
156
|
|
|
|
157
|
|
|
return dict(badges_by_user_id) |
158
|
|
|
|
159
|
|
|
|
160
|
1 |
|
def _db_entity_to_badge_awarding(entity: DbBadgeAwarding) -> BadgeAwarding: |
161
|
1 |
|
return BadgeAwarding( |
162
|
|
|
entity.id, |
163
|
|
|
entity.badge_id, |
164
|
|
|
entity.user_id, |
165
|
|
|
entity.awarded_at |
166
|
|
|
) |
167
|
|
|
|