Completed
Push — main ( 2a7d5a...4a0412 )
by Jochen
05:23
created

byceps.services.orga_team.service.get_public_orgas_for_party()   A

Complexity

Conditions 1

Size

Total Lines 30
Code Lines 22

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.7023

Importance

Changes 0
Metric Value
cc 1
eloc 22
nop 1
dl 0
loc 30
ccs 1
cts 9
cp 0.1111
crap 1.7023
rs 9.352
c 0
b 0
f 0
1
"""
2
byceps.services.orga_team.service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2020 Jochen Kupperschmidt
6
:License: Modified BSD, see LICENSE for details.
7
"""
8
9 1
import dataclasses
10 1
from typing import Dict, Optional, Sequence, Set, Tuple
11
12 1
from ...database import db
13 1
from ...typing import PartyID, UserID
14
15 1
from ..orga.models import OrgaFlag as DbOrgaFlag
16 1
from ..party import service as party_service
17 1
from ..user.models.user import User as DbUser
18 1
from ..user import service as user_service
19 1
from ..user.transfer.models import User
20
21 1
from .models import Membership as DbMembership, OrgaTeam as DbOrgaTeam
22 1
from .transfer.models import (
23
    Membership,
24
    MembershipID,
25
    OrgaTeam,
26
    OrgaTeamID,
27
    PublicOrga,
28
)
29
30
31
# -------------------------------------------------------------------- #
32
# teams
33
34
35 1
def create_team(party_id: PartyID, title: str) -> OrgaTeam:
36
    """Create an orga team for that party."""
37
    team = DbOrgaTeam(party_id, title)
38
39
    db.session.add(team)
40
    db.session.commit()
41
42
    return _db_entity_to_team(team)
43
44
45 1
def delete_team(team_id: OrgaTeamID) -> None:
46
    """Delete the orga team."""
47
    db.session.query(DbOrgaTeam) \
48
        .filter_by(id=team_id) \
49
        .delete()
50
51
    db.session.commit()
52
53
54 1
def count_teams_for_party(party_id: PartyID) -> int:
55
    """Return the number of orga teams for that party."""
56
    return DbOrgaTeam.query \
57
        .filter_by(party_id=party_id) \
58
        .count()
59
60
61 1
def get_teams_for_party(party_id: PartyID) -> Set[OrgaTeam]:
62
    """Return orga teams for that party."""
63
    teams = DbOrgaTeam.query \
64
        .filter_by(party_id=party_id) \
65
        .all()
66
67
    return {_db_entity_to_team(team) for team in teams}
68
69
70 1
def find_team(team_id: OrgaTeamID) -> Optional[OrgaTeam]:
71
    """Return the team with that id, or `None` if not found."""
72
    team = _find_db_team(team_id)
73
74
    if team is None:
75
        return None
76
77
    return _db_entity_to_team(team)
78
79
80 1
def _find_db_team(team_id: OrgaTeamID) -> Optional[DbOrgaTeam]:
81
    """Return the team with that id, or `None` if not found."""
82
    return DbOrgaTeam.query.get(team_id)
83
84
85 1
def get_teams_and_memberships_for_party(
86
    party_id: PartyID
87
) -> Sequence[Tuple[OrgaTeam, DbMembership]]:
88
    """Return all orga teams and their corresponding memberships for
89
    that party.
90
    """
91
92
    teams = DbOrgaTeam.query \
93
        .options(db.joinedload('memberships')) \
94
        .filter_by(party_id=party_id) \
95
        .all()
96
97
    return [(_db_entity_to_team(team), team.memberships) for team in teams]
98
99
100 1
def _db_entity_to_team(team: DbOrgaTeam) -> OrgaTeam:
101
    return OrgaTeam(
102
        team.id,
103
        team.party_id,
104
        team.title,
105
    )
106
107
108
# -------------------------------------------------------------------- #
109
# memberships
110
111
112 1
def create_membership(
113
    team_id: OrgaTeamID, user_id: UserID, duties: str
114
) -> Membership:
115
    """Assign the user to the team."""
116
    membership = DbMembership(team_id, user_id)
117
118
    if duties:
119
        membership.duties = duties
120
121
    db.session.add(membership)
122
    db.session.commit()
123
124
    return _db_entity_to_membership(membership)
125
126
127 1
def update_membership(
128
    membership_id: MembershipID, team_id: OrgaTeamID, duties: str
129
) -> None:
130
    """Update the membership."""
131
    membership = _find_db_membership(membership_id)
132
    if membership is None:
133
        raise ValueError(f"Unknown membership ID '{membership_id}'")
134
135
    team = _find_db_team(team_id)
136
    if team is None:
137
        raise ValueError(f"Unknown team ID '{team_id}'")
138
139
    membership.orga_team = team
140
    membership.duties = duties
141
    db.session.commit()
142
143
144 1
def delete_membership(membership_id: MembershipID) -> None:
145
    """Delete the membership."""
146
    db.session.query(DbMembership) \
147
        .filter_by(id=membership_id) \
148
        .delete()
149
150
    db.session.commit()
151
152
153 1
def count_memberships_for_party(party_id: PartyID) -> int:
154
    """Return the number of memberships the party's teams have in total."""
155
    return DbMembership.query \
156
        .for_party(party_id) \
157
        .count()
158
159
160 1
def find_membership(membership_id: MembershipID) -> Optional[DbMembership]:
161
    """Return the membership with that id, or `None` if not found."""
162
    return _find_db_membership(membership_id)
163
164
165 1
def _find_db_membership(membership_id: MembershipID) -> Optional[DbMembership]:
166
    """Return the membership with that id, or `None` if not found."""
167
    return DbMembership.query.get(membership_id)
168
169
170 1
def find_membership_for_party(
171
    user_id: UserID, party_id: PartyID
172
) -> Optional[DbMembership]:
173
    """Return the user's membership in an orga team of that party, or
174
    `None` of user it not part of an orga team for that party.
175
    """
176 1
    return DbMembership.query \
177
        .filter_by(user_id=user_id) \
178
        .for_party(party_id) \
179
        .one_or_none()
180
181
182 1
def get_memberships_for_user(user_id: UserID) -> Sequence[DbMembership]:
183
    """Return all orga team memberships for that user."""
184
    return DbMembership.query \
185
        .options(
186
            db.joinedload('orga_team').joinedload('party'),
187
        ) \
188
        .filter_by(user_id=user_id) \
189
        .all()
190
191
192 1
def get_public_orgas_for_party(party_id: PartyID) -> Set[PublicOrga]:
193
    """Return all public orgas for that party."""
194
    memberships = DbMembership.query \
195
        .for_party(party_id) \
196
        .options(
197
            db.joinedload('orga_team'),
198
            db.joinedload('user')
199
                .load_only('id'),
200
            db.joinedload('user')
201
                .joinedload('detail')
202
                .load_only('first_names', 'last_name'),
203
        ) \
204
        .all()
205
206
    users_by_id = _get_public_orga_users_by_id(memberships)
207
208
    def _to_orga(membership):
209
        user = users_by_id[membership.user_id]
210
211
        return PublicOrga(
212
            user,
213
            membership.user.detail.full_name,
214
            membership.orga_team.title,
215
            membership.duties,
216
        )
217
218
    orgas = {_to_orga(ms) for ms in memberships}
219
    orgas = {orga for orga in orgas if not orga.user.deleted}
220
221
    return orgas
222
223
224 1
def _get_public_orga_users_by_id(memberships: DbMembership) -> Dict[UserID, User]:
225
    user_ids = {ms.user_id for ms in memberships}
226
227
    users = user_service.find_users(user_ids, include_avatars=True)
228
229
    # Each of these users is an organizer.
230
    users = {dataclasses.replace(u, is_orga=True) for u in users}
231
232
    return {user.id: user for user in users}
233
234
235 1
def has_team_memberships(team_id: OrgaTeamID) -> bool:
236
    """Return `True` if the team has memberships."""
237
    return db.session \
238
        .query(
239
            db.session
240
                .query(DbMembership)
241
                .filter(DbMembership.orga_team_id == team_id)
242
                .exists()
243
        ) \
244
        .scalar()
245
246
247 1
def _db_entity_to_membership(membership: DbMembership) -> Membership:
248
    return Membership(
249
        membership.id,
250
        membership.orga_team_id,
251
        membership.user,
252
        membership.duties,
253
    )
254
255
256
# -------------------------------------------------------------------- #
257
# organizers
258
259
260 1
def get_unassigned_orgas_for_party(party_id: PartyID) -> Set[User]:
261
    """Return organizers that are not assigned to a team for the party."""
262
    party = party_service.get_party(party_id)
263
264
    assigned_orgas = DbUser.query \
265
        .join(DbMembership) \
266
        .join(DbOrgaTeam) \
267
        .filter(DbOrgaTeam.party_id == party.id) \
268
        .options(db.load_only(DbUser.id)) \
269
        .all()
270
    assigned_orga_ids = frozenset(user.id for user in assigned_orgas)
271
272
    unassigned_orga_ids_query = db.session \
273
        .query(DbUser.id)
274
275
    if assigned_orga_ids:
276
        unassigned_orga_ids_query = unassigned_orga_ids_query \
277
            .filter(db.not_(DbUser.id.in_(assigned_orga_ids)))
278
279
    unassigned_orga_ids = unassigned_orga_ids_query \
280
        .filter_by(deleted=False) \
281
        .join(DbOrgaFlag).filter(DbOrgaFlag.brand_id == party.brand_id) \
282
        .all()
283
284
    return user_service.find_users(unassigned_orga_ids)
285
286
287 1
def is_orga_for_party(user_id: UserID, party_id: PartyID) -> bool:
288
    """Return `True` if the user is an organizer (i.e. is member of an
289
    organizer team) of that party.
290
    """
291 1
    return db.session \
292
        .query(
293
            db.session
294
                .query(DbMembership)
295
                .filter(DbMembership.user_id == user_id)
296
                .join(DbOrgaTeam)
297
                .filter(DbOrgaTeam.party_id == party_id)
298
                .exists()
299
        ) \
300
        .scalar()
301