Passed
Push — main ( 7fb18a...baf70b )
by Jochen
04:45
created

get_attendees_by_party()   A

Complexity

Conditions 3

Size

Total Lines 26
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 10.0792

Importance

Changes 0
Metric Value
cc 3
eloc 16
dl 0
loc 26
rs 9.6
c 0
b 0
f 0
nop 1
ccs 1
cts 13
cp 0.0769
crap 10.0792
1
"""
2
byceps.services.ticketing.attendance_service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2021 Jochen Kupperschmidt
6
:License: Revised BSD (see `LICENSE` file for details)
7
"""
8
9 1
from __future__ import annotations
10 1
from collections import Counter, defaultdict
11 1
from datetime import datetime
12 1
from itertools import chain
13
14 1
from sqlalchemy.dialects.postgresql import insert
15
16 1
from ...database import db, upsert
17 1
from ...typing import BrandID, PartyID, UserID
18
19 1
from ..party.dbmodels.party import Party as DbParty
20 1
from ..party import service as party_service
21 1
from ..party.transfer.models import Party
22
23 1
from .dbmodels.archived_attendance import (
24
    ArchivedAttendance as DbArchivedAttendance,
25
)
26 1
from .dbmodels.category import Category as DbCategory
27 1
from .dbmodels.ticket import Ticket as DbTicket
28
29
30 1
def create_archived_attendance(user_id: UserID, party_id: PartyID) -> None:
31
    """Create an archived attendance of the user at the party."""
32 1
    table = DbArchivedAttendance.__table__
33
34 1
    query = insert(table) \
35
        .values({
36
            'user_id': str(user_id),
37
            'party_id': str(party_id),
38
        }) \
39
        .on_conflict_do_nothing(constraint=table.primary_key)
40 1
    db.session.execute(query)
41
42
43 1
def delete_archived_attendance(user_id: UserID, party_id: PartyID) -> None:
44
    """Delete the archived attendance of the user at the party."""
45 1
    db.session.query(DbArchivedAttendance) \
46
        .filter_by(user_id=user_id, party_id=party_id) \
47
        .delete()
48 1
    db.session.commit()
49
50
51 1
def get_attended_parties(user_id: UserID) -> list[Party]:
52
    """Return the parties the user has attended in the past."""
53 1
    ticket_attendance_party_ids = _get_attended_party_ids(user_id)
54 1
    archived_attendance_party_ids = _get_archived_attendance_party_ids(user_id)
55
56 1
    party_ids = set(
57
        chain(ticket_attendance_party_ids, archived_attendance_party_ids)
58
    )
59
60 1
    return party_service.get_parties(party_ids)
61
62
63 1
def _get_attended_party_ids(user_id: UserID) -> set[PartyID]:
64
    """Return the IDs of the non-legacy parties the user has attended."""
65 1
    party_id_rows = db.session \
66
        .query(DbParty.id) \
67
        .filter(DbParty.ends_at < datetime.utcnow()) \
68
        .join(DbCategory) \
69
        .join(DbTicket) \
70
        .filter(DbTicket.revoked == False) \
71
        .filter(DbTicket.used_by_id == user_id) \
72
        .all()
73
74 1
    return {row[0] for row in party_id_rows}
75
76
77 1
def _get_archived_attendance_party_ids(user_id: UserID) -> set[PartyID]:
78
    """Return the IDs of the legacy parties the user has attended."""
79 1
    party_id_rows = db.session \
80
        .query(DbArchivedAttendance.party_id) \
81
        .filter(DbArchivedAttendance.user_id == user_id) \
82
        .all()
83
84 1
    return {row[0] for row in party_id_rows}
85
86
87 1
def get_attendee_ids_for_party(party_id: PartyID) -> set[UserID]:
88
    """Return the party's attendees' IDs."""
89
    ticket_rows = db.session \
90
        .query(DbTicket.used_by_id) \
91
        .join(DbCategory) \
92
        .filter(DbCategory.party_id == party_id) \
93
        .filter(DbTicket.revoked == False) \
94
        .filter(DbTicket.used_by_id != None) \
95
        .all()
96
97
    archived_attendance_rows = db.session \
98
        .query(DbArchivedAttendance.user_id) \
99
        .filter(DbArchivedAttendance.party_id == party_id) \
100
        .all()
101
102
    rows = ticket_rows + archived_attendance_rows
103
    return {row for row in rows}
104
105
106 1
def get_top_attendees_for_brand(brand_id: BrandID) -> list[tuple[UserID, int]]:
107
    """Return the attendees with the highest number of parties of this
108
    brand visited.
109
    """
110 1
    parties = party_service.get_parties_for_brand(brand_id)
111 1
    party_ids = {p.id for p in parties}
112
113 1
    top_ticket_attendance_counts = _get_top_ticket_attendees_for_parties(
114
        brand_id
115
    )
116
117 1
    top_archived_attendance_counts = _get_top_archived_attendees_for_parties(
118
        brand_id
119
    )
120
121 1
    top_attendance_counts = _merge_top_attendance_counts(
122
        [top_ticket_attendance_counts, top_archived_attendance_counts]
123
    )
124
125
    # Select top attendees with more than one attendance.
126 1
    top_attendees = top_attendance_counts.most_common(50)
127 1
    top_attendees = [
128
        (user_id, attendance_count)
129
        for user_id, attendance_count in top_attendees
130
        if attendance_count > 1
131
    ]
132
133 1
    return top_attendees
134
135
136 1
def _get_top_ticket_attendees_for_parties(
137
    brand_id: BrandID,
138
) -> list[tuple[UserID, int]]:
139 1
    user_id_column = db.aliased(DbTicket).used_by_id
140
141 1
    attendance_count = db.session \
142
        .query(
143
            db.func.count(DbCategory.party_id.distinct()),
144
        ) \
145
        .join(DbParty) \
146
        .filter(DbParty.brand_id == brand_id) \
147
        .join(DbTicket) \
148
        .filter(DbTicket.revoked == False) \
149
        .filter(DbTicket.used_by_id == user_id_column) \
150
        .subquery() \
151
        .as_scalar()
152
153 1
    return db.session \
154
        .query(
155
            user_id_column.distinct(),
156
            attendance_count,
157
        ) \
158
        .filter(user_id_column != None) \
159
        .filter(attendance_count > 0) \
160
        .order_by(attendance_count.desc()) \
161
        .all()
162
163
164 1
def _get_top_archived_attendees_for_parties(
165
    brand_id: BrandID,
166
) -> list[tuple[UserID, int]]:
167 1
    attendance_count_column = db.func \
168
        .count(DbArchivedAttendance.user_id) \
169
        .label('attendance_count')
170
171 1
    return db.session \
172
        .query(
173
            DbArchivedAttendance.user_id,
174
            attendance_count_column,
175
        ) \
176
        .join(DbParty) \
177
        .filter(DbParty.brand_id == brand_id) \
178
        .group_by(DbArchivedAttendance.user_id) \
179
        .order_by(attendance_count_column.desc()) \
180
        .all()
181
182
183 1
def _merge_top_attendance_counts(
184
    xs: list[list[tuple[UserID, int]]]
185
) -> Counter[UserID]:
186 1
    counter: Counter = Counter()
187
188 1
    for x in xs:
189 1
        counter.update(dict(x))
190
191
    return counter
192