Passed
Push — master ( cfcdc8...cf67f4 )
by Jochen
02:19
created

_generate_attendees()   A

Complexity

Conditions 2

Size

Total Lines 7
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 6
nop 2
dl 0
loc 7
rs 10
c 0
b 0
f 0
1
"""
2
byceps.services.attendance.service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2019 Jochen Kupperschmidt
6
"""
7
8
from collections import defaultdict
9
from typing import Dict, Iterable, List, Optional, Set, Tuple
10
11
from ...database import db, paginate, Pagination
12
from ...typing import PartyID, UserID
13
14
from ..ticketing.models.ticket import Category as DbCategory, Ticket as DbTicket
15
from ..user.models.user import User as DbUser
16
17
from .transfer.models import Attendee, AttendeeTicket
18
19
20
def get_attendees_paginated(
21
    party_id: PartyID,
22
    page: int,
23
    per_page: int,
24
    *,
25
    search_term: Optional[str] = None,
26
) -> Pagination:
27
    """Return the party's ticket users with tickets and seats."""
28
    users_paginated = _get_users_paginated(
29
        party_id, page, per_page, search_term=search_term
30
    )
31
    users = users_paginated.items
32
    user_ids = {u.id for u in users}
33
34
    tickets = _get_tickets_for_users(party_id, user_ids)
35
    tickets_by_user_id = _index_tickets_by_user_id(tickets)
36
37
    attendees = list(_generate_attendees(users, tickets_by_user_id))
38
39
    users_paginated.items = attendees
40
    return users_paginated
41
42
43
def _get_users_paginated(
44
    party_id: PartyID,
45
    page: int,
46
    per_page: int,
47
    *,
48
    search_term: Optional[str] = None,
49
) -> Pagination:
50
    # Drop revoked tickets here already to avoid users without tickets
51
    # being included in the list.
52
    query = DbUser.query \
53
        .distinct() \
54
        .options(
55
            db.load_only('id', 'screen_name', 'deleted'),
56
            db.joinedload('avatar_selection').joinedload('avatar'),
57
        ) \
58
        .join(DbTicket, DbTicket.used_by_id == DbUser.id) \
59
        .filter(DbTicket.revoked == False) \
60
        .join(DbCategory).filter(DbCategory.party_id == party_id)
61
62
    if search_term:
63
        query = query \
64
            .filter(DbUser.screen_name.ilike(f'%{search_term}%'))
65
66
    query = query \
67
        .order_by(db.func.lower(DbUser.screen_name))
68
69
    return paginate(query, page, per_page)
70
71
72
def _get_tickets_for_users(
73
    party_id: PartyID, user_ids: Set[UserID]
74
) -> List[DbTicket]:
75
    return DbTicket.query \
76
        .options(
77
            db.joinedload('category'),
78
            db.joinedload('occupied_seat').joinedload('area'),
79
        ) \
80
        .for_party(party_id) \
81
        .filter(DbTicket.used_by_id.in_(user_ids)) \
82
        .filter(DbTicket.revoked == False) \
83
        .all()
84
85
86
def _index_tickets_by_user_id(
87
    tickets: Iterable[DbTicket]
88
) -> Dict[UserID, Set[DbTicket]]:
89
    tickets_by_user_id = defaultdict(set)
90
    for ticket in tickets:
91
        tickets_by_user_id[ticket.used_by_id].add(ticket)
92
    return tickets_by_user_id
93
94
95
def _generate_attendees(
96
    users: Iterable[DbUser], tickets_by_user_id: Dict[UserID, Set[DbTicket]]
97
) -> Iterable[Attendee]:
98
    for user in users:
99
        tickets = tickets_by_user_id[user.id]
100
        attendee_tickets = _to_attendee_tickets(tickets)
101
        yield Attendee(user, attendee_tickets)
102
103
104
def _to_attendee_tickets(tickets: Iterable[DbTicket]) -> List[AttendeeTicket]:
105
    attendee_tickets = [
106
        AttendeeTicket(t.occupied_seat, t.user_checked_in) for t in tickets
107
    ]
108
    attendee_tickets.sort(key=_get_attendee_ticket_sort_key)
109
    return attendee_tickets
110
111
112
def _get_attendee_ticket_sort_key(
113
    attendee_ticket: AttendeeTicket
114
) -> Tuple[bool, str, bool]:
115
    return (
116
        # List tickets with occupied seat first.
117
        attendee_ticket.seat is None,
118
119
        # Sort by seat label.
120
        attendee_ticket.seat.label if attendee_ticket.seat else None,
121
122
        # List checked in tickets first.
123
        not attendee_ticket.checked_in,
124
    )
125