Passed
Push — master ( 96ba35...87f3f9 )
by Jochen
02:23
created

byceps.services.ticketing.attendance_service._get_top_archived_attendees_for_parties()   A

Complexity

Conditions 1

Size

Total Lines 16
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 13
nop 1
dl 0
loc 16
rs 9.75
c 0
b 0
f 0
1
"""
2
byceps.services.ticketing.attendance_service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2019 Jochen Kupperschmidt
6
:License: Modified BSD, see LICENSE for details.
7
"""
8
9
from collections import Counter, defaultdict
10
from datetime import datetime
11
from itertools import chain
12
import typing
13
from typing import Dict, List, Set, Tuple
14
15
from sqlalchemy.dialects.postgresql import insert
16
17
from ...database import db, upsert
18
from ...typing import BrandID, PartyID, UserID
19
20
from ..party.models.party import Party as DbParty
21
from ..party import service as party_service
22
from ..party.transfer.models import Party
23
from ..user import service as user_service
24
from ..user.transfer.models import User
25
26
from .models.archived_attendance import (
27
    ArchivedAttendance as DbArchivedAttendance,
28
)
29
from .models.category import Category as DbCategory
30
from .models.ticket import Ticket as DbTicket
31
32
33
def create_archived_attendance(user_id: UserID, party_id: PartyID) -> None:
34
    """Create an archived attendance of the user at the party."""
35
    table = DbArchivedAttendance.__table__
36
37
    query = insert(table) \
38
        .values({
39
            'user_id': str(user_id),
40
            'party_id': str(party_id),
41
        }) \
42
        .on_conflict_do_nothing(constraint=table.primary_key)
43
    db.session.execute(query)
44
45
46
def get_attended_parties(user_id: UserID) -> List[Party]:
47
    """Return the parties the user has attended in the past."""
48
    ticket_attendance_party_ids = _get_attended_party_ids(user_id)
49
    archived_attendance_party_ids = _get_archived_attendance_party_ids(user_id)
50
51
    party_ids = set(
52
        chain(ticket_attendance_party_ids, archived_attendance_party_ids)
53
    )
54
55
    return party_service.get_parties(party_ids)
56
57
58
def _get_attended_party_ids(user_id: UserID) -> Set[PartyID]:
59
    """Return the IDs of the non-legacy parties the user has attended."""
60
    party_id_rows = db.session \
61
        .query(DbParty.id) \
62
        .filter(DbParty.ends_at < datetime.utcnow()) \
63
        .join(DbCategory) \
64
        .join(DbTicket) \
65
        .filter(DbTicket.used_by_id == user_id) \
66
        .all()
67
68
    return {row[0] for row in party_id_rows}
69
70
71
def _get_archived_attendance_party_ids(user_id: UserID) -> Set[PartyID]:
72
    """Return the IDs of the legacy parties the user has attended."""
73
    party_id_rows = db.session \
74
        .query(DbArchivedAttendance.party_id) \
75
        .filter(DbArchivedAttendance.user_id == user_id) \
76
        .all()
77
78
    return {row[0] for row in party_id_rows}
79
80
81
def get_attendees_by_party(party_ids: Set[PartyID]) -> Dict[PartyID, Set[User]]:
82
    """Return the parties' attendees, indexed by party."""
83
    if not party_ids:
84
        return {}
85
86
    attendee_ids_by_party_id = get_attendee_ids_for_parties(party_ids)
87
88
    all_attendee_ids = set(
89
        chain.from_iterable(attendee_ids_by_party_id.values())
90
    )
91
    all_attendees = user_service.find_users(
92
        all_attendee_ids, include_avatars=True
93
    )
94
    all_attendees_by_id = user_service.index_users_by_id(all_attendees)
95
96
    attendees_by_party_id = {}
97
    for party_id in party_ids:
98
        attendee_ids = attendee_ids_by_party_id.get(party_id, set())
99
100
        attendees = {
101
            all_attendees_by_id[attendee_id] for attendee_id in attendee_ids
102
        }
103
104
        attendees_by_party_id[party_id] = attendees
105
106
    return attendees_by_party_id
107
108
109
def get_attendee_ids_for_parties(
110
    party_ids: Set[PartyID]
111
) -> Dict[PartyID, Set[UserID]]:
112
    """Return the partys' attendee IDs, indexed by party ID."""
113
    if not party_ids:
114
        return {}
115
116
    ticket_rows = db.session \
117
        .query(DbCategory.party_id, DbTicket.used_by_id) \
118
        .filter(DbCategory.party_id.in_(party_ids)) \
119
        .join(DbTicket) \
120
        .filter(DbTicket.used_by_id != None) \
121
        .all()
122
123
    archived_attendance_rows = db.session \
124
        .query(
125
            DbArchivedAttendance.party_id,
126
            DbArchivedAttendance.user_id
127
        ) \
128
        .filter(DbArchivedAttendance.party_id.in_(party_ids)) \
129
        .all()
130
131
    rows = ticket_rows + archived_attendance_rows
132
133
    attendee_ids_by_party_id: Dict[PartyID, Set[UserID]] = defaultdict(set)
134
    for party_id, attendee_id in rows:
135
        attendee_ids_by_party_id[party_id].add(attendee_id)
136
137
    return dict(attendee_ids_by_party_id)
138
139
140
def get_top_attendees_for_brand(brand_id: BrandID) -> List[Tuple[UserID, int]]:
141
    """Return the attendees with the highest number of parties of this
142
    brand visited.
143
    """
144
    parties = party_service.get_parties_for_brand(brand_id)
145
    party_ids = {p.id for p in parties}
146
147
    top_ticket_attendance_counts = _get_top_ticket_attendees_for_parties(
148
        party_ids
149
    )
150
151
    top_archived_attendance_counts = _get_top_archived_attendees_for_parties(
152
        party_ids
153
    )
154
155
    top_attendance_counts = _merge_top_attendance_counts(
156
        [top_ticket_attendance_counts, top_archived_attendance_counts]
157
    )
158
159
    return top_attendance_counts.most_common(50)
160
161
162
def _get_top_ticket_attendees_for_parties(
163
    party_ids: Set[PartyID]
164
) -> List[Tuple[UserID, int]]:
165
    attendance_count_column = db.func \
166
        .count(DbTicket.used_by_id) \
167
        .label('attendance_count')
168
169
    return db.session \
170
        .query(
171
            DbTicket.used_by_id,
172
            attendance_count_column,
173
        ) \
174
        .join(DbCategory) \
175
        .filter(DbCategory.party_id.in_(party_ids)) \
176
        .filter(DbTicket.used_by_id != None) \
177
        .group_by(DbTicket.used_by_id) \
178
        .order_by(attendance_count_column.desc()) \
179
        .all()
180
181
182
def _get_top_archived_attendees_for_parties(
183
    party_ids: Set[PartyID]
184
) -> List[Tuple[UserID, int]]:
185
    attendance_count_column = db.func \
186
        .count(DbArchivedAttendance.user_id) \
187
        .label('attendance_count')
188
189
    return db.session \
190
        .query(
191
            DbArchivedAttendance.user_id,
192
            attendance_count_column,
193
        ) \
194
        .filter(DbArchivedAttendance.party_id.in_(party_ids)) \
195
        .group_by(DbArchivedAttendance.user_id) \
196
        .order_by(attendance_count_column.desc()) \
197
        .all()
198
199
200
def _merge_top_attendance_counts(
201
    xs: List[List[Tuple[UserID, int]]]
202
) -> typing.Counter[UserID]:
203
    counter = Counter()
204
205
    for x in xs:
206
        counter.update(dict(x))
207
208
    return counter
209