Completed
Push — main ( dc9c2e...80557c )
by Jochen
05:20
created

byceps.services.ticketing.attendance_service   A

Complexity

Total Complexity 16

Size/Duplication

Total Lines 236
Duplicated Lines 0 %

Test Coverage

Coverage 73.08%

Importance

Changes 0
Metric Value
eloc 148
dl 0
loc 236
ccs 57
cts 78
cp 0.7308
rs 10
c 0
b 0
f 0
wmc 16

11 Functions

Rating   Name   Duplication   Size   Complexity  
A delete_archived_attendance() 0 6 1
A _get_attended_party_ids() 0 12 1
A _get_archived_attendance_party_ids() 0 8 1
A create_archived_attendance() 0 11 1
A get_attendee_ids_for_parties() 0 30 3
A get_attended_parties() 0 10 1
A get_attendees_by_party() 0 26 3
A _get_top_ticket_attendees_for_parties() 0 26 1
A _get_top_archived_attendees_for_parties() 0 17 1
A _merge_top_attendance_counts() 0 9 2
A get_top_attendees_for_brand() 0 28 1
1
"""
2
byceps.services.ticketing.attendance_service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2020 Jochen Kupperschmidt
6
:License: Modified BSD, see LICENSE for details.
7
"""
8
9 1
from collections import Counter, defaultdict
10 1
from datetime import datetime
11 1
from itertools import chain
12 1
import typing
13 1
from typing import Dict, List, Set, Tuple
14
15 1
from sqlalchemy.dialects.postgresql import insert
16
17 1
from ...database import db, upsert
18 1
from ...typing import BrandID, PartyID, UserID
19
20 1
from ..party.models.party import Party as DbParty
21 1
from ..party import service as party_service
22 1
from ..party.transfer.models import Party
23 1
from ..user import service as user_service
24 1
from ..user.transfer.models import User
25
26 1
from .models.archived_attendance import (
27
    ArchivedAttendance as DbArchivedAttendance,
28
)
29 1
from .models.category import Category as DbCategory
30 1
from .models.ticket import Ticket as DbTicket
31
32
33 1
def create_archived_attendance(user_id: UserID, party_id: PartyID) -> None:
34
    """Create an archived attendance of the user at the party."""
35 1
    table = DbArchivedAttendance.__table__
36
37 1
    query = insert(table) \
38
        .values({
39
            'user_id': str(user_id),
40
            'party_id': str(party_id),
41
        }) \
42
        .on_conflict_do_nothing(constraint=table.primary_key)
43 1
    db.session.execute(query)
44
45
46 1
def delete_archived_attendance(user_id: UserID, party_id: PartyID) -> None:
47
    """Delete the archived attendance of the user at the party."""
48 1
    db.session.query(DbArchivedAttendance) \
49
        .filter_by(user_id=user_id, party_id=party_id) \
50
        .delete()
51 1
    db.session.commit()
52
53
54 1
def get_attended_parties(user_id: UserID) -> List[Party]:
55
    """Return the parties the user has attended in the past."""
56 1
    ticket_attendance_party_ids = _get_attended_party_ids(user_id)
57 1
    archived_attendance_party_ids = _get_archived_attendance_party_ids(user_id)
58
59 1
    party_ids = set(
60
        chain(ticket_attendance_party_ids, archived_attendance_party_ids)
61
    )
62
63 1
    return party_service.get_parties(party_ids)
64
65
66 1
def _get_attended_party_ids(user_id: UserID) -> Set[PartyID]:
67
    """Return the IDs of the non-legacy parties the user has attended."""
68 1
    party_id_rows = db.session \
69
        .query(DbParty.id) \
70
        .filter(DbParty.ends_at < datetime.utcnow()) \
71
        .join(DbCategory) \
72
        .join(DbTicket) \
73
        .filter(DbTicket.revoked == False) \
74
        .filter(DbTicket.used_by_id == user_id) \
75
        .all()
76
77 1
    return {row[0] for row in party_id_rows}
78
79
80 1
def _get_archived_attendance_party_ids(user_id: UserID) -> Set[PartyID]:
81
    """Return the IDs of the legacy parties the user has attended."""
82 1
    party_id_rows = db.session \
83
        .query(DbArchivedAttendance.party_id) \
84
        .filter(DbArchivedAttendance.user_id == user_id) \
85
        .all()
86
87 1
    return {row[0] for row in party_id_rows}
88
89
90 1
def get_attendees_by_party(party_ids: Set[PartyID]) -> Dict[PartyID, Set[User]]:
91
    """Return the parties' attendees, indexed by party."""
92
    if not party_ids:
93
        return {}
94
95
    attendee_ids_by_party_id = get_attendee_ids_for_parties(party_ids)
96
97
    all_attendee_ids = set(
98
        chain.from_iterable(attendee_ids_by_party_id.values())
99
    )
100
    all_attendees = user_service.find_users(
101
        all_attendee_ids, include_avatars=True
102
    )
103
    all_attendees_by_id = user_service.index_users_by_id(all_attendees)
104
105
    attendees_by_party_id = {}
106
    for party_id in party_ids:
107
        attendee_ids = attendee_ids_by_party_id.get(party_id, set())
108
109
        attendees = {
110
            all_attendees_by_id[attendee_id] for attendee_id in attendee_ids
111
        }
112
113
        attendees_by_party_id[party_id] = attendees
114
115
    return attendees_by_party_id
116
117
118 1
def get_attendee_ids_for_parties(
119
    party_ids: Set[PartyID],
120
) -> Dict[PartyID, Set[UserID]]:
121
    """Return the partys' attendee IDs, indexed by party ID."""
122
    if not party_ids:
123
        return {}
124
125
    ticket_rows = db.session \
126
        .query(DbCategory.party_id, DbTicket.used_by_id) \
127
        .filter(DbCategory.party_id.in_(party_ids)) \
128
        .join(DbTicket) \
129
        .filter(DbTicket.revoked == False) \
130
        .filter(DbTicket.used_by_id != None) \
131
        .all()
132
133
    archived_attendance_rows = db.session \
134
        .query(
135
            DbArchivedAttendance.party_id,
136
            DbArchivedAttendance.user_id
137
        ) \
138
        .filter(DbArchivedAttendance.party_id.in_(party_ids)) \
139
        .all()
140
141
    rows = ticket_rows + archived_attendance_rows
142
143
    attendee_ids_by_party_id: Dict[PartyID, Set[UserID]] = defaultdict(set)
144
    for party_id, attendee_id in rows:
145
        attendee_ids_by_party_id[party_id].add(attendee_id)
146
147
    return dict(attendee_ids_by_party_id)
148
149
150 1
def get_top_attendees_for_brand(brand_id: BrandID) -> List[Tuple[UserID, int]]:
151
    """Return the attendees with the highest number of parties of this
152
    brand visited.
153
    """
154 1
    parties = party_service.get_parties_for_brand(brand_id)
155 1
    party_ids = {p.id for p in parties}
156
157 1
    top_ticket_attendance_counts = _get_top_ticket_attendees_for_parties(
158
        brand_id
159
    )
160
161 1
    top_archived_attendance_counts = _get_top_archived_attendees_for_parties(
162
        brand_id
163
    )
164
165 1
    top_attendance_counts = _merge_top_attendance_counts(
166
        [top_ticket_attendance_counts, top_archived_attendance_counts]
167
    )
168
169
    # Select top attendees with more than one attendance.
170 1
    top_attendees = top_attendance_counts.most_common(50)
171 1
    top_attendees = [
172
        (user_id, attendance_count)
173
        for user_id, attendance_count in top_attendees
174
        if attendance_count > 1
175
    ]
176
177 1
    return top_attendees
178
179
180 1
def _get_top_ticket_attendees_for_parties(
181
    brand_id: BrandID,
182
) -> List[Tuple[UserID, int]]:
183 1
    user_id_column = db.aliased(DbTicket).used_by_id
184
185 1
    attendance_count = db.session \
186
        .query(
187
            db.func.count(DbCategory.party_id.distinct()),
188
        ) \
189
        .join(DbParty) \
190
        .filter(DbParty.brand_id == brand_id) \
191
        .join(DbTicket) \
192
        .filter(DbTicket.revoked == False) \
193
        .filter(DbTicket.used_by_id == user_id_column) \
194
        .subquery() \
195
        .as_scalar()
196
197 1
    return db.session \
198
        .query(
199
            user_id_column.distinct(),
200
            attendance_count,
201
        ) \
202
        .filter(user_id_column != None) \
203
        .filter(attendance_count > 0) \
204
        .order_by(attendance_count.desc()) \
205
        .all()
206
207
208 1
def _get_top_archived_attendees_for_parties(
209
    brand_id: BrandID,
210
) -> List[Tuple[UserID, int]]:
211 1
    attendance_count_column = db.func \
212
        .count(DbArchivedAttendance.user_id) \
213
        .label('attendance_count')
214
215 1
    return db.session \
216
        .query(
217
            DbArchivedAttendance.user_id,
218
            attendance_count_column,
219
        ) \
220
        .join(DbParty) \
221
        .filter(DbParty.brand_id == brand_id) \
222
        .group_by(DbArchivedAttendance.user_id) \
223
        .order_by(attendance_count_column.desc()) \
224
        .all()
225
226
227 1
def _merge_top_attendance_counts(
228
    xs: List[List[Tuple[UserID, int]]]
229
) -> typing.Counter[UserID]:
230 1
    counter = Counter()
231
232 1
    for x in xs:
233 1
        counter.update(dict(x))
234
235
    return counter
236