1 | """ |
||
2 | byceps.services.user_badge.awarding_service |
||
3 | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
||
4 | |||
5 | :Copyright: 2006-2020 Jochen Kupperschmidt |
||
6 | :License: Modified BSD, see LICENSE for details. |
||
7 | """ |
||
8 | |||
9 | 1 | from collections import defaultdict |
|
10 | 1 | from datetime import datetime |
|
11 | 1 | from typing import Dict, Optional, Set, Tuple |
|
12 | |||
13 | 1 | from ...database import db |
|
14 | 1 | from ...events.user_badge import UserBadgeAwarded |
|
15 | 1 | from ...typing import UserID |
|
16 | |||
17 | 1 | from ..user import event_service |
|
18 | |||
19 | 1 | from .badge_service import _db_entity_to_badge |
|
20 | 1 | from .models.awarding import BadgeAwarding as DbBadgeAwarding |
|
21 | 1 | from .models.badge import Badge as DbBadge |
|
22 | 1 | from .transfer.models import ( |
|
23 | Badge, |
||
24 | BadgeAwarding, |
||
25 | BadgeID, |
||
26 | QuantifiedBadgeAwarding, |
||
27 | ) |
||
28 | |||
29 | |||
30 | 1 | def award_badge_to_user( |
|
31 | badge_id: BadgeID, user_id: UserID, *, initiator_id: Optional[UserID] = None |
||
32 | ) -> Tuple[BadgeAwarding, UserBadgeAwarded]: |
||
33 | """Award the badge to the user.""" |
||
34 | 1 | awarded_at = datetime.utcnow() |
|
35 | |||
36 | 1 | awarding = DbBadgeAwarding(badge_id, user_id, awarded_at=awarded_at) |
|
37 | 1 | db.session.add(awarding) |
|
38 | |||
39 | 1 | event_data = {'badge_id': str(badge_id)} |
|
40 | 1 | if initiator_id: |
|
41 | 1 | event_data['initiator_id'] = str(initiator_id) |
|
42 | 1 | event = event_service.build_event( |
|
43 | 'user-badge-awarded', user_id, event_data, occurred_at=awarded_at |
||
44 | ) |
||
45 | 1 | db.session.add(event) |
|
46 | |||
47 | 1 | db.session.commit() |
|
48 | |||
49 | 1 | awarding_dto = _db_entity_to_badge_awarding(awarding) |
|
50 | |||
51 | 1 | event = UserBadgeAwarded( |
|
52 | occurred_at=awarded_at, |
||
53 | user_id=user_id, |
||
54 | badge_id=badge_id, |
||
55 | initiator_id=initiator_id, |
||
56 | ) |
||
57 | |||
58 | 1 | return awarding_dto, event |
|
59 | |||
60 | |||
61 | 1 | def count_awardings() -> Dict[BadgeID, int]: |
|
62 | """Return the number of times each badge has been awarded. |
||
63 | |||
64 | Because a badge can be awarded multiple times to a user, the number |
||
65 | of awardings does not represent the number of awardees. |
||
66 | """ |
||
67 | 1 | rows = db.session \ |
|
68 | .query( |
||
69 | DbBadge.id, |
||
70 | db.func.count(DbBadgeAwarding.id) |
||
71 | ) \ |
||
72 | .outerjoin(DbBadgeAwarding) \ |
||
73 | .group_by(DbBadge.id) \ |
||
74 | .all() |
||
75 | |||
76 | 1 | return {badge_id: count for badge_id, count in rows} |
|
77 | |||
78 | |||
79 | 1 | def get_awardings_of_badge(badge_id: BadgeID) -> Set[QuantifiedBadgeAwarding]: |
|
80 | """Return the awardings of this badge.""" |
||
81 | 1 | rows = db.session \ |
|
82 | .query( |
||
83 | DbBadgeAwarding.badge_id, |
||
84 | DbBadgeAwarding.user_id, |
||
85 | db.func.count(DbBadgeAwarding.badge_id) |
||
86 | ) \ |
||
87 | .filter(DbBadgeAwarding.badge_id == badge_id) \ |
||
88 | .group_by( |
||
89 | DbBadgeAwarding.badge_id, |
||
90 | DbBadgeAwarding.user_id |
||
91 | ) \ |
||
92 | .all() |
||
93 | |||
94 | 1 | return { |
|
95 | QuantifiedBadgeAwarding(badge_id, user_id, quantity) |
||
96 | for badge_id, user_id, quantity in rows |
||
97 | } |
||
98 | |||
99 | |||
100 | 1 | def get_badges_awarded_to_user(user_id: UserID) -> Dict[Badge, int]: |
|
101 | """Return all badges that have been awarded to the user (and how often).""" |
||
102 | 1 | rows = db.session \ |
|
103 | .query( |
||
104 | DbBadgeAwarding.badge_id, |
||
105 | db.func.count(DbBadgeAwarding.badge_id) |
||
106 | ) \ |
||
107 | .filter(DbBadgeAwarding.user_id == user_id) \ |
||
108 | .group_by( |
||
109 | DbBadgeAwarding.badge_id, |
||
110 | ) \ |
||
111 | .all() |
||
112 | |||
113 | 1 | badge_ids_with_awarding_quantity = {row[0]: row[1] for row in rows} |
|
114 | |||
115 | 1 | badge_ids = set(badge_ids_with_awarding_quantity.keys()) |
|
116 | |||
117 | 1 | if badge_ids: |
|
118 | 1 | badges = DbBadge.query \ |
|
119 | .filter(DbBadge.id.in_(badge_ids)) \ |
||
120 | .all() |
||
121 | else: |
||
122 | badges = [] |
||
123 | |||
124 | 1 | badges_with_awarding_quantity = {} |
|
125 | 1 | for badge in badges: |
|
126 | 1 | quantity = badge_ids_with_awarding_quantity[badge.id] |
|
127 | 1 | badges_with_awarding_quantity[_db_entity_to_badge(badge)] = quantity |
|
128 | |||
129 | 1 | return badges_with_awarding_quantity |
|
130 | |||
131 | |||
132 | 1 | def get_badges_awarded_to_users( |
|
133 | user_ids: Set[UserID], *, featured_only: bool = False |
||
134 | ) -> Dict[UserID, Set[Badge]]: |
||
135 | """Return all badges that have been awarded to the users, indexed |
||
136 | by user ID. |
||
137 | |||
138 | If `featured_only` is `True`, only return featured badges. |
||
139 | """ |
||
140 | if not user_ids: |
||
141 | return {} |
||
142 | |||
143 | awardings = DbBadgeAwarding.query \ |
||
144 | .filter(DbBadgeAwarding.user_id.in_(user_ids)) \ |
||
145 | .all() |
||
146 | |||
147 | badge_ids = {awarding.badge_id for awarding in awardings} |
||
148 | badges = get_badges(badge_ids, featured_only=featured_only) |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
Loading history...
|
|||
149 | badges_by_id = {badge.id: badge for badge in badges} |
||
150 | |||
151 | badges_by_user_id: Dict[UserID, Set[Badge]] = defaultdict(set) |
||
152 | for awarding in awardings: |
||
153 | badge = badges_by_id.get(awarding.badge_id) |
||
154 | if badge: |
||
155 | badges_by_user_id[awarding.user_id].add(badge) |
||
156 | |||
157 | return dict(badges_by_user_id) |
||
158 | |||
159 | |||
160 | 1 | def _db_entity_to_badge_awarding(entity: DbBadgeAwarding) -> BadgeAwarding: |
|
161 | 1 | return BadgeAwarding( |
|
162 | entity.id, |
||
163 | entity.badge_id, |
||
164 | entity.user_id, |
||
165 | entity.awarded_at |
||
166 | ) |
||
167 |