Completed
Push — main ( 820682...4c245a )
by Jochen
05:14
created

byceps/services/user_badge/awarding_service.py (1 issue)

1
"""
2
byceps.services.user_badge.awarding_service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2020 Jochen Kupperschmidt
6
:License: Modified BSD, see LICENSE for details.
7
"""
8
9 1
from collections import defaultdict
10 1
from datetime import datetime
11 1
from typing import Dict, Optional, Set, Tuple
12
13 1
from ...database import db
14 1
from ...events.user_badge import UserBadgeAwarded
15 1
from ...typing import UserID
16
17 1
from ..user import event_service
18
19 1
from .badge_service import _db_entity_to_badge
20 1
from .models.awarding import BadgeAwarding as DbBadgeAwarding
21 1
from .models.badge import Badge as DbBadge
22 1
from .transfer.models import (
23
    Badge,
24
    BadgeAwarding,
25
    BadgeID,
26
    QuantifiedBadgeAwarding,
27
)
28
29
30 1
def award_badge_to_user(
31
    badge_id: BadgeID, user_id: UserID, *, initiator_id: Optional[UserID] = None
32
) -> Tuple[BadgeAwarding, UserBadgeAwarded]:
33
    """Award the badge to the user."""
34 1
    awarded_at = datetime.utcnow()
35
36 1
    awarding = DbBadgeAwarding(badge_id, user_id, awarded_at=awarded_at)
37 1
    db.session.add(awarding)
38
39 1
    event_data = {'badge_id': str(badge_id)}
40 1
    if initiator_id:
41 1
        event_data['initiator_id'] = str(initiator_id)
42 1
    event = event_service.build_event(
43
        'user-badge-awarded', user_id, event_data, occurred_at=awarded_at
44
    )
45 1
    db.session.add(event)
46
47 1
    db.session.commit()
48
49 1
    awarding_dto = _db_entity_to_badge_awarding(awarding)
50
51 1
    event = UserBadgeAwarded(
52
        occurred_at=awarded_at,
53
        user_id=user_id,
54
        badge_id=badge_id,
55
        initiator_id=initiator_id,
56
    )
57
58 1
    return awarding_dto, event
59
60
61 1
def count_awardings() -> Dict[BadgeID, int]:
62
    """Return the number of times each badge has been awarded.
63
64
    Because a badge can be awarded multiple times to a user, the number
65
    of awardings does not represent the number of awardees.
66
    """
67 1
    rows = db.session \
68
        .query(
69
            DbBadge.id,
70
            db.func.count(DbBadgeAwarding.id)
71
        ) \
72
        .outerjoin(DbBadgeAwarding) \
73
        .group_by(DbBadge.id) \
74
        .all()
75
76 1
    return {badge_id: count for badge_id, count in rows}
77
78
79 1
def get_awardings_of_badge(badge_id: BadgeID) -> Set[QuantifiedBadgeAwarding]:
80
    """Return the awardings of this badge."""
81 1
    rows = db.session \
82
        .query(
83
            DbBadgeAwarding.badge_id,
84
            DbBadgeAwarding.user_id,
85
            db.func.count(DbBadgeAwarding.badge_id)
86
        ) \
87
        .filter(DbBadgeAwarding.badge_id == badge_id) \
88
        .group_by(
89
            DbBadgeAwarding.badge_id,
90
            DbBadgeAwarding.user_id
91
        ) \
92
        .all()
93
94 1
    return {
95
        QuantifiedBadgeAwarding(badge_id, user_id, quantity)
96
        for badge_id, user_id, quantity in rows
97
    }
98
99
100 1
def get_badges_awarded_to_user(user_id: UserID) -> Dict[Badge, int]:
101
    """Return all badges that have been awarded to the user (and how often)."""
102 1
    rows = db.session \
103
        .query(
104
            DbBadgeAwarding.badge_id,
105
            db.func.count(DbBadgeAwarding.badge_id)
106
        ) \
107
        .filter(DbBadgeAwarding.user_id == user_id) \
108
        .group_by(
109
            DbBadgeAwarding.badge_id,
110
        ) \
111
        .all()
112
113 1
    badge_ids_with_awarding_quantity = {row[0]: row[1] for row in rows}
114
115 1
    badge_ids = set(badge_ids_with_awarding_quantity.keys())
116
117 1
    if badge_ids:
118 1
        badges = DbBadge.query \
119
            .filter(DbBadge.id.in_(badge_ids)) \
120
            .all()
121
    else:
122
        badges = []
123
124 1
    badges_with_awarding_quantity = {}
125 1
    for badge in badges:
126 1
        quantity = badge_ids_with_awarding_quantity[badge.id]
127 1
        badges_with_awarding_quantity[_db_entity_to_badge(badge)] = quantity
128
129 1
    return badges_with_awarding_quantity
130
131
132 1
def get_badges_awarded_to_users(
133
    user_ids: Set[UserID], *, featured_only: bool = False
134
) -> Dict[UserID, Set[Badge]]:
135
    """Return all badges that have been awarded to the users, indexed
136
    by user ID.
137
138
    If `featured_only` is `True`, only return featured badges.
139
    """
140
    if not user_ids:
141
        return {}
142
143
    awardings = DbBadgeAwarding.query \
144
        .filter(DbBadgeAwarding.user_id.in_(user_ids)) \
145
        .all()
146
147
    badge_ids = {awarding.badge_id for awarding in awardings}
148
    badges = get_badges(badge_ids, featured_only=featured_only)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable get_badges does not seem to be defined.
Loading history...
149
    badges_by_id = {badge.id: badge for badge in badges}
150
151
    badges_by_user_id: Dict[UserID, Set[Badge]] = defaultdict(set)
152
    for awarding in awardings:
153
        badge = badges_by_id.get(awarding.badge_id)
154
        if badge:
155
            badges_by_user_id[awarding.user_id].add(badge)
156
157
    return dict(badges_by_user_id)
158
159
160 1
def _db_entity_to_badge_awarding(entity: DbBadgeAwarding) -> BadgeAwarding:
161 1
    return BadgeAwarding(
162
        entity.id,
163
        entity.badge_id,
164
        entity.user_id,
165
        entity.awarded_at
166
    )
167