1
|
|
|
""" |
2
|
|
|
byceps.services.orga_team.service |
3
|
|
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
4
|
|
|
|
5
|
|
|
:Copyright: 2006-2020 Jochen Kupperschmidt |
6
|
|
|
:License: Modified BSD, see LICENSE for details. |
7
|
|
|
""" |
8
|
|
|
|
9
|
1 |
|
import dataclasses |
10
|
1 |
|
from typing import Dict, Optional, Sequence, Set, Tuple |
11
|
|
|
|
12
|
1 |
|
from ...database import db |
13
|
1 |
|
from ...typing import PartyID, UserID |
14
|
|
|
|
15
|
1 |
|
from ..orga.models import OrgaFlag as DbOrgaFlag |
16
|
1 |
|
from ..party import service as party_service |
17
|
1 |
|
from ..user.models.user import User as DbUser |
18
|
1 |
|
from ..user import service as user_service |
19
|
1 |
|
from ..user.transfer.models import User |
20
|
|
|
|
21
|
1 |
|
from .models import Membership as DbMembership, OrgaTeam as DbOrgaTeam |
22
|
1 |
|
from .transfer.models import ( |
23
|
|
|
Membership, |
24
|
|
|
MembershipID, |
25
|
|
|
OrgaTeam, |
26
|
|
|
OrgaTeamID, |
27
|
|
|
PublicOrga, |
28
|
|
|
) |
29
|
|
|
|
30
|
|
|
|
31
|
|
|
# -------------------------------------------------------------------- # |
32
|
|
|
# teams |
33
|
|
|
|
34
|
|
|
|
35
|
1 |
|
def create_team(party_id: PartyID, title: str) -> OrgaTeam: |
36
|
|
|
"""Create an orga team for that party.""" |
37
|
|
|
team = DbOrgaTeam(party_id, title) |
38
|
|
|
|
39
|
|
|
db.session.add(team) |
40
|
|
|
db.session.commit() |
41
|
|
|
|
42
|
|
|
return _db_entity_to_team(team) |
43
|
|
|
|
44
|
|
|
|
45
|
1 |
|
def delete_team(team_id: OrgaTeamID) -> None: |
46
|
|
|
"""Delete the orga team.""" |
47
|
|
|
db.session.query(DbOrgaTeam) \ |
48
|
|
|
.filter_by(id=team_id) \ |
49
|
|
|
.delete() |
50
|
|
|
|
51
|
|
|
db.session.commit() |
52
|
|
|
|
53
|
|
|
|
54
|
1 |
|
def count_teams_for_party(party_id: PartyID) -> int: |
55
|
|
|
"""Return the number of orga teams for that party.""" |
56
|
|
|
return DbOrgaTeam.query \ |
57
|
|
|
.filter_by(party_id=party_id) \ |
58
|
|
|
.count() |
59
|
|
|
|
60
|
|
|
|
61
|
1 |
|
def get_teams_for_party(party_id: PartyID) -> Set[OrgaTeam]: |
62
|
|
|
"""Return orga teams for that party.""" |
63
|
|
|
teams = DbOrgaTeam.query \ |
64
|
|
|
.filter_by(party_id=party_id) \ |
65
|
|
|
.all() |
66
|
|
|
|
67
|
|
|
return {_db_entity_to_team(team) for team in teams} |
68
|
|
|
|
69
|
|
|
|
70
|
1 |
|
def find_team(team_id: OrgaTeamID) -> Optional[OrgaTeam]: |
71
|
|
|
"""Return the team with that id, or `None` if not found.""" |
72
|
|
|
team = _find_db_team(team_id) |
73
|
|
|
|
74
|
|
|
if team is None: |
75
|
|
|
return None |
76
|
|
|
|
77
|
|
|
return _db_entity_to_team(team) |
78
|
|
|
|
79
|
|
|
|
80
|
1 |
|
def _find_db_team(team_id: OrgaTeamID) -> Optional[DbOrgaTeam]: |
81
|
|
|
"""Return the team with that id, or `None` if not found.""" |
82
|
|
|
return DbOrgaTeam.query.get(team_id) |
83
|
|
|
|
84
|
|
|
|
85
|
1 |
|
def get_teams_and_memberships_for_party( |
86
|
|
|
party_id: PartyID |
87
|
|
|
) -> Sequence[Tuple[OrgaTeam, DbMembership]]: |
88
|
|
|
"""Return all orga teams and their corresponding memberships for |
89
|
|
|
that party. |
90
|
|
|
""" |
91
|
|
|
|
92
|
|
|
teams = DbOrgaTeam.query \ |
93
|
|
|
.options(db.joinedload('memberships')) \ |
94
|
|
|
.filter_by(party_id=party_id) \ |
95
|
|
|
.all() |
96
|
|
|
|
97
|
|
|
return [(_db_entity_to_team(team), team.memberships) for team in teams] |
98
|
|
|
|
99
|
|
|
|
100
|
1 |
|
def _db_entity_to_team(team: DbOrgaTeam) -> OrgaTeam: |
101
|
|
|
return OrgaTeam( |
102
|
|
|
team.id, |
103
|
|
|
team.party_id, |
104
|
|
|
team.title, |
105
|
|
|
) |
106
|
|
|
|
107
|
|
|
|
108
|
|
|
# -------------------------------------------------------------------- # |
109
|
|
|
# memberships |
110
|
|
|
|
111
|
|
|
|
112
|
1 |
|
def create_membership( |
113
|
|
|
team_id: OrgaTeamID, user_id: UserID, duties: str |
114
|
|
|
) -> Membership: |
115
|
|
|
"""Assign the user to the team.""" |
116
|
|
|
membership = DbMembership(team_id, user_id) |
117
|
|
|
|
118
|
|
|
if duties: |
119
|
|
|
membership.duties = duties |
120
|
|
|
|
121
|
|
|
db.session.add(membership) |
122
|
|
|
db.session.commit() |
123
|
|
|
|
124
|
|
|
return _db_entity_to_membership(membership) |
125
|
|
|
|
126
|
|
|
|
127
|
1 |
|
def update_membership( |
128
|
|
|
membership_id: MembershipID, team_id: OrgaTeamID, duties: str |
129
|
|
|
) -> None: |
130
|
|
|
"""Update the membership.""" |
131
|
|
|
membership = _find_db_membership(membership_id) |
132
|
|
|
if membership is None: |
133
|
|
|
raise ValueError(f"Unknown membership ID '{membership_id}'") |
134
|
|
|
|
135
|
|
|
team = _find_db_team(team_id) |
136
|
|
|
if team is None: |
137
|
|
|
raise ValueError(f"Unknown team ID '{team_id}'") |
138
|
|
|
|
139
|
|
|
membership.orga_team = team |
140
|
|
|
membership.duties = duties |
141
|
|
|
db.session.commit() |
142
|
|
|
|
143
|
|
|
|
144
|
1 |
|
def delete_membership(membership_id: MembershipID) -> None: |
145
|
|
|
"""Delete the membership.""" |
146
|
|
|
db.session.query(DbMembership) \ |
147
|
|
|
.filter_by(id=membership_id) \ |
148
|
|
|
.delete() |
149
|
|
|
|
150
|
|
|
db.session.commit() |
151
|
|
|
|
152
|
|
|
|
153
|
1 |
|
def count_memberships_for_party(party_id: PartyID) -> int: |
154
|
|
|
"""Return the number of memberships the party's teams have in total.""" |
155
|
|
|
return DbMembership.query \ |
156
|
|
|
.for_party(party_id) \ |
157
|
|
|
.count() |
158
|
|
|
|
159
|
|
|
|
160
|
1 |
|
def find_membership(membership_id: MembershipID) -> Optional[DbMembership]: |
161
|
|
|
"""Return the membership with that id, or `None` if not found.""" |
162
|
|
|
return _find_db_membership(membership_id) |
163
|
|
|
|
164
|
|
|
|
165
|
1 |
|
def _find_db_membership(membership_id: MembershipID) -> Optional[DbMembership]: |
166
|
|
|
"""Return the membership with that id, or `None` if not found.""" |
167
|
|
|
return DbMembership.query.get(membership_id) |
168
|
|
|
|
169
|
|
|
|
170
|
1 |
|
def find_membership_for_party( |
171
|
|
|
user_id: UserID, party_id: PartyID |
172
|
|
|
) -> Optional[DbMembership]: |
173
|
|
|
"""Return the user's membership in an orga team of that party, or |
174
|
|
|
`None` of user it not part of an orga team for that party. |
175
|
|
|
""" |
176
|
1 |
|
return DbMembership.query \ |
177
|
|
|
.filter_by(user_id=user_id) \ |
178
|
|
|
.for_party(party_id) \ |
179
|
|
|
.one_or_none() |
180
|
|
|
|
181
|
|
|
|
182
|
1 |
|
def get_memberships_for_user(user_id: UserID) -> Sequence[DbMembership]: |
183
|
|
|
"""Return all orga team memberships for that user.""" |
184
|
|
|
return DbMembership.query \ |
185
|
|
|
.options( |
186
|
|
|
db.joinedload('orga_team').joinedload('party'), |
187
|
|
|
) \ |
188
|
|
|
.filter_by(user_id=user_id) \ |
189
|
|
|
.all() |
190
|
|
|
|
191
|
|
|
|
192
|
1 |
|
def get_public_orgas_for_party(party_id: PartyID) -> Set[PublicOrga]: |
193
|
|
|
"""Return all public orgas for that party.""" |
194
|
|
|
memberships = DbMembership.query \ |
195
|
|
|
.for_party(party_id) \ |
196
|
|
|
.options( |
197
|
|
|
db.joinedload('orga_team'), |
198
|
|
|
db.joinedload('user') |
199
|
|
|
.load_only('id'), |
200
|
|
|
db.joinedload('user') |
201
|
|
|
.joinedload('detail') |
202
|
|
|
.load_only('first_names', 'last_name'), |
203
|
|
|
) \ |
204
|
|
|
.all() |
205
|
|
|
|
206
|
|
|
users_by_id = _get_public_orga_users_by_id(memberships) |
207
|
|
|
|
208
|
|
|
def _to_orga(membership): |
209
|
|
|
user = users_by_id[membership.user_id] |
210
|
|
|
|
211
|
|
|
return PublicOrga( |
212
|
|
|
user, |
213
|
|
|
membership.user.detail.full_name, |
214
|
|
|
membership.orga_team.title, |
215
|
|
|
membership.duties, |
216
|
|
|
) |
217
|
|
|
|
218
|
|
|
orgas = {_to_orga(ms) for ms in memberships} |
219
|
|
|
orgas = {orga for orga in orgas if not orga.user.deleted} |
220
|
|
|
|
221
|
|
|
return orgas |
222
|
|
|
|
223
|
|
|
|
224
|
1 |
|
def _get_public_orga_users_by_id(memberships: DbMembership) -> Dict[UserID, User]: |
225
|
|
|
user_ids = {ms.user_id for ms in memberships} |
226
|
|
|
|
227
|
|
|
users = user_service.find_users(user_ids, include_avatars=True) |
228
|
|
|
|
229
|
|
|
# Each of these users is an organizer. |
230
|
|
|
users = {dataclasses.replace(u, is_orga=True) for u in users} |
231
|
|
|
|
232
|
|
|
return {user.id: user for user in users} |
233
|
|
|
|
234
|
|
|
|
235
|
1 |
|
def has_team_memberships(team_id: OrgaTeamID) -> bool: |
236
|
|
|
"""Return `True` if the team has memberships.""" |
237
|
|
|
return db.session \ |
238
|
|
|
.query( |
239
|
|
|
db.session |
240
|
|
|
.query(DbMembership) |
241
|
|
|
.filter(DbMembership.orga_team_id == team_id) |
242
|
|
|
.exists() |
243
|
|
|
) \ |
244
|
|
|
.scalar() |
245
|
|
|
|
246
|
|
|
|
247
|
1 |
|
def _db_entity_to_membership(membership: DbMembership) -> Membership: |
248
|
|
|
return Membership( |
249
|
|
|
membership.id, |
250
|
|
|
membership.orga_team_id, |
251
|
|
|
membership.user, |
252
|
|
|
membership.duties, |
253
|
|
|
) |
254
|
|
|
|
255
|
|
|
|
256
|
|
|
# -------------------------------------------------------------------- # |
257
|
|
|
# organizers |
258
|
|
|
|
259
|
|
|
|
260
|
1 |
|
def get_unassigned_orgas_for_party(party_id: PartyID) -> Set[User]: |
261
|
|
|
"""Return organizers that are not assigned to a team for the party.""" |
262
|
|
|
party = party_service.get_party(party_id) |
263
|
|
|
|
264
|
|
|
assigned_orgas = DbUser.query \ |
265
|
|
|
.join(DbMembership) \ |
266
|
|
|
.join(DbOrgaTeam) \ |
267
|
|
|
.filter(DbOrgaTeam.party_id == party.id) \ |
268
|
|
|
.options(db.load_only(DbUser.id)) \ |
269
|
|
|
.all() |
270
|
|
|
assigned_orga_ids = frozenset(user.id for user in assigned_orgas) |
271
|
|
|
|
272
|
|
|
unassigned_orga_ids_query = db.session \ |
273
|
|
|
.query(DbUser.id) |
274
|
|
|
|
275
|
|
|
if assigned_orga_ids: |
276
|
|
|
unassigned_orga_ids_query = unassigned_orga_ids_query \ |
277
|
|
|
.filter(db.not_(DbUser.id.in_(assigned_orga_ids))) |
278
|
|
|
|
279
|
|
|
unassigned_orga_ids = unassigned_orga_ids_query \ |
280
|
|
|
.filter_by(deleted=False) \ |
281
|
|
|
.join(DbOrgaFlag).filter(DbOrgaFlag.brand_id == party.brand_id) \ |
282
|
|
|
.all() |
283
|
|
|
|
284
|
|
|
return user_service.find_users(unassigned_orga_ids) |
285
|
|
|
|
286
|
|
|
|
287
|
1 |
|
def is_orga_for_party(user_id: UserID, party_id: PartyID) -> bool: |
288
|
|
|
"""Return `True` if the user is an organizer (i.e. is member of an |
289
|
|
|
organizer team) of that party. |
290
|
|
|
""" |
291
|
1 |
|
return db.session \ |
292
|
|
|
.query( |
293
|
|
|
db.session |
294
|
|
|
.query(DbMembership) |
295
|
|
|
.filter(DbMembership.user_id == user_id) |
296
|
|
|
.join(DbOrgaTeam) |
297
|
|
|
.filter(DbOrgaTeam.party_id == party_id) |
298
|
|
|
.exists() |
299
|
|
|
) \ |
300
|
|
|
.scalar() |
301
|
|
|
|