1
|
|
|
""" |
2
|
|
|
byceps.services.orga_team.service |
3
|
|
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
4
|
|
|
|
5
|
|
|
:Copyright: 2006-2020 Jochen Kupperschmidt |
6
|
|
|
:License: Modified BSD, see LICENSE for details. |
7
|
|
|
""" |
8
|
|
|
|
9
|
1 |
|
import dataclasses |
10
|
1 |
|
from typing import Dict, Optional, Sequence, Set, Tuple |
11
|
|
|
|
12
|
1 |
|
from ...database import db |
13
|
1 |
|
from ...typing import PartyID, UserID |
14
|
|
|
|
15
|
1 |
|
from ..orga.models import OrgaFlag as DbOrgaFlag |
16
|
1 |
|
from ..party import service as party_service |
17
|
1 |
|
from ..user.models.user import User as DbUser |
18
|
1 |
|
from ..user import service as user_service |
19
|
1 |
|
from ..user.transfer.models import User |
20
|
|
|
|
21
|
1 |
|
from .models import Membership as DbMembership, OrgaTeam as DbOrgaTeam |
22
|
1 |
|
from .transfer.models import ( |
23
|
|
|
Member, |
24
|
|
|
Membership, |
25
|
|
|
MembershipID, |
26
|
|
|
OrgaActivity, |
27
|
|
|
OrgaTeam, |
28
|
|
|
OrgaTeamID, |
29
|
|
|
PublicOrga, |
30
|
|
|
) |
31
|
|
|
|
32
|
|
|
|
33
|
|
|
# -------------------------------------------------------------------- # |
34
|
|
|
# teams |
35
|
|
|
|
36
|
|
|
|
37
|
1 |
|
def create_team(party_id: PartyID, title: str) -> OrgaTeam: |
38
|
|
|
"""Create an orga team for that party.""" |
39
|
|
|
team = DbOrgaTeam(party_id, title) |
40
|
|
|
|
41
|
|
|
db.session.add(team) |
42
|
|
|
db.session.commit() |
43
|
|
|
|
44
|
|
|
return _db_entity_to_team(team) |
45
|
|
|
|
46
|
|
|
|
47
|
1 |
|
def delete_team(team_id: OrgaTeamID) -> None: |
48
|
|
|
"""Delete the orga team.""" |
49
|
|
|
db.session.query(DbOrgaTeam) \ |
50
|
|
|
.filter_by(id=team_id) \ |
51
|
|
|
.delete() |
52
|
|
|
|
53
|
|
|
db.session.commit() |
54
|
|
|
|
55
|
|
|
|
56
|
1 |
|
def count_teams_for_party(party_id: PartyID) -> int: |
57
|
|
|
"""Return the number of orga teams for that party.""" |
58
|
|
|
return DbOrgaTeam.query \ |
59
|
|
|
.filter_by(party_id=party_id) \ |
60
|
|
|
.count() |
61
|
|
|
|
62
|
|
|
|
63
|
1 |
|
def get_teams_for_party(party_id: PartyID) -> Set[OrgaTeam]: |
64
|
|
|
"""Return orga teams for that party.""" |
65
|
|
|
teams = DbOrgaTeam.query \ |
66
|
|
|
.filter_by(party_id=party_id) \ |
67
|
|
|
.all() |
68
|
|
|
|
69
|
|
|
return {_db_entity_to_team(team) for team in teams} |
70
|
|
|
|
71
|
|
|
|
72
|
1 |
|
def find_team(team_id: OrgaTeamID) -> Optional[OrgaTeam]: |
73
|
|
|
"""Return the team with that id, or `None` if not found.""" |
74
|
|
|
team = _find_db_team(team_id) |
75
|
|
|
|
76
|
|
|
if team is None: |
77
|
|
|
return None |
78
|
|
|
|
79
|
|
|
return _db_entity_to_team(team) |
80
|
|
|
|
81
|
|
|
|
82
|
1 |
|
def _find_db_team(team_id: OrgaTeamID) -> Optional[DbOrgaTeam]: |
83
|
|
|
"""Return the team with that id, or `None` if not found.""" |
84
|
|
|
return DbOrgaTeam.query.get(team_id) |
85
|
|
|
|
86
|
|
|
|
87
|
1 |
|
def get_teams_and_members_for_party( |
88
|
|
|
party_id: PartyID, |
89
|
|
|
) -> Sequence[Tuple[OrgaTeam, Set[Member]]]: |
90
|
|
|
"""Return all orga teams and their corresponding memberships for |
91
|
|
|
that party. |
92
|
|
|
""" |
93
|
|
|
|
94
|
|
|
teams = DbOrgaTeam.query \ |
95
|
|
|
.options(db.joinedload('memberships')) \ |
96
|
|
|
.filter_by(party_id=party_id) \ |
97
|
|
|
.all() |
98
|
|
|
|
99
|
|
|
user_ids = {ms.user_id for team in teams for ms in team.memberships} |
100
|
|
|
users = user_service.find_users(user_ids, include_avatars=True) |
101
|
|
|
users_by_id = {user.id: user for user in users} |
102
|
|
|
|
103
|
|
|
def to_member(db_membership: DbMembership) -> Member: |
104
|
|
|
membership = _db_entity_to_membership(db_membership) |
105
|
|
|
user = users_by_id[membership.user_id] |
106
|
|
|
|
107
|
|
|
return Member(membership, user) |
108
|
|
|
|
109
|
|
|
return [ |
110
|
|
|
(_db_entity_to_team(team), {to_member(ms) for ms in team.memberships}) |
111
|
|
|
for team in teams |
112
|
|
|
] |
113
|
|
|
|
114
|
|
|
|
115
|
1 |
|
def _db_entity_to_team(team: DbOrgaTeam) -> OrgaTeam: |
116
|
|
|
return OrgaTeam( |
117
|
|
|
team.id, |
118
|
|
|
team.party_id, |
119
|
|
|
team.title, |
120
|
|
|
) |
121
|
|
|
|
122
|
|
|
|
123
|
|
|
# -------------------------------------------------------------------- # |
124
|
|
|
# memberships |
125
|
|
|
|
126
|
|
|
|
127
|
1 |
|
def create_membership( |
128
|
|
|
team_id: OrgaTeamID, user_id: UserID, duties: str |
129
|
|
|
) -> Membership: |
130
|
|
|
"""Assign the user to the team.""" |
131
|
|
|
membership = DbMembership(team_id, user_id) |
132
|
|
|
|
133
|
|
|
if duties: |
134
|
|
|
membership.duties = duties |
135
|
|
|
|
136
|
|
|
db.session.add(membership) |
137
|
|
|
db.session.commit() |
138
|
|
|
|
139
|
|
|
return _db_entity_to_membership(membership) |
140
|
|
|
|
141
|
|
|
|
142
|
1 |
|
def update_membership( |
143
|
|
|
membership_id: MembershipID, team_id: OrgaTeamID, duties: str |
144
|
|
|
) -> None: |
145
|
|
|
"""Update the membership.""" |
146
|
|
|
membership = _find_db_membership(membership_id) |
147
|
|
|
if membership is None: |
148
|
|
|
raise ValueError(f"Unknown membership ID '{membership_id}'") |
149
|
|
|
|
150
|
|
|
team = _find_db_team(team_id) |
151
|
|
|
if team is None: |
152
|
|
|
raise ValueError(f"Unknown team ID '{team_id}'") |
153
|
|
|
|
154
|
|
|
membership.orga_team = team |
155
|
|
|
membership.duties = duties |
156
|
|
|
db.session.commit() |
157
|
|
|
|
158
|
|
|
|
159
|
1 |
|
def delete_membership(membership_id: MembershipID) -> None: |
160
|
|
|
"""Delete the membership.""" |
161
|
|
|
db.session.query(DbMembership) \ |
162
|
|
|
.filter_by(id=membership_id) \ |
163
|
|
|
.delete() |
164
|
|
|
|
165
|
|
|
db.session.commit() |
166
|
|
|
|
167
|
|
|
|
168
|
1 |
|
def count_memberships_for_party(party_id: PartyID) -> int: |
169
|
|
|
"""Return the number of memberships the party's teams have in total.""" |
170
|
|
|
return DbMembership.query \ |
171
|
|
|
.for_party(party_id) \ |
172
|
|
|
.count() |
173
|
|
|
|
174
|
|
|
|
175
|
1 |
|
def find_membership(membership_id: MembershipID) -> Optional[Membership]: |
176
|
|
|
"""Return the membership with that id, or `None` if not found.""" |
177
|
|
|
membership = _find_db_membership(membership_id) |
178
|
|
|
|
179
|
|
|
if membership is None: |
180
|
|
|
return None |
181
|
|
|
|
182
|
|
|
return _db_entity_to_membership(membership) |
183
|
|
|
|
184
|
|
|
|
185
|
1 |
|
def _find_db_membership(membership_id: MembershipID) -> Optional[DbMembership]: |
186
|
|
|
"""Return the membership with that id, or `None` if not found.""" |
187
|
|
|
return DbMembership.query.get(membership_id) |
188
|
|
|
|
189
|
|
|
|
190
|
1 |
|
def find_orga_team_for_user_and_party( |
191
|
|
|
user_id: UserID, party_id: PartyID |
192
|
|
|
) -> Optional[OrgaTeam]: |
193
|
|
|
"""Return the user's membership in an orga team of that party, or |
194
|
|
|
`None` of user it not part of an orga team for that party. |
195
|
|
|
""" |
196
|
1 |
|
return DbOrgaTeam.query \ |
197
|
|
|
.join(DbMembership) \ |
198
|
|
|
.filter(DbMembership.user_id == user_id) \ |
199
|
|
|
.filter(DbOrgaTeam.party_id == party_id) \ |
200
|
|
|
.one_or_none() |
201
|
|
|
|
202
|
|
|
|
203
|
1 |
|
def get_orga_activities_for_user(user_id: UserID) -> Set[OrgaActivity]: |
204
|
|
|
"""Return all orga team activities for that user.""" |
205
|
|
|
memberships = DbMembership.query \ |
206
|
|
|
.options( |
207
|
|
|
db.joinedload('orga_team').joinedload('party'), |
208
|
|
|
) \ |
209
|
|
|
.filter_by(user_id=user_id) \ |
210
|
|
|
.all() |
211
|
|
|
|
212
|
|
|
def to_activity(membership: DbMembership) -> OrgaActivity: |
213
|
|
|
party = party_service._db_entity_to_party(membership.orga_team.party) |
214
|
|
|
team = _db_entity_to_team(membership.orga_team) |
215
|
|
|
|
216
|
|
|
return OrgaActivity( |
217
|
|
|
membership.user_id, |
218
|
|
|
party, |
219
|
|
|
team, |
220
|
|
|
membership.duties, |
221
|
|
|
) |
222
|
|
|
|
223
|
|
|
return {to_activity(ms) for ms in memberships} |
224
|
|
|
|
225
|
|
|
|
226
|
1 |
|
def get_public_orgas_for_party(party_id: PartyID) -> Set[PublicOrga]: |
227
|
|
|
"""Return all public orgas for that party.""" |
228
|
|
|
memberships = DbMembership.query \ |
229
|
|
|
.for_party(party_id) \ |
230
|
|
|
.options( |
231
|
|
|
db.joinedload('orga_team'), |
232
|
|
|
db.joinedload('user') |
233
|
|
|
.load_only('id'), |
234
|
|
|
db.joinedload('user') |
235
|
|
|
.joinedload('detail') |
236
|
|
|
.load_only('first_names', 'last_name'), |
237
|
|
|
) \ |
238
|
|
|
.all() |
239
|
|
|
|
240
|
|
|
users_by_id = _get_public_orga_users_by_id(memberships) |
241
|
|
|
|
242
|
|
|
def to_orga(membership: DbMembership) -> PublicOrga: |
243
|
|
|
user = users_by_id[membership.user_id] |
244
|
|
|
|
245
|
|
|
return PublicOrga( |
246
|
|
|
user, |
247
|
|
|
membership.user.detail.full_name, |
248
|
|
|
membership.orga_team.title, |
249
|
|
|
membership.duties, |
250
|
|
|
) |
251
|
|
|
|
252
|
|
|
orgas = {to_orga(ms) for ms in memberships} |
253
|
|
|
orgas = {orga for orga in orgas if not orga.user.deleted} |
254
|
|
|
|
255
|
|
|
return orgas |
256
|
|
|
|
257
|
|
|
|
258
|
1 |
|
def _get_public_orga_users_by_id( |
259
|
|
|
memberships: DbMembership, |
260
|
|
|
) -> Dict[UserID, User]: |
261
|
|
|
user_ids = {ms.user_id for ms in memberships} |
262
|
|
|
|
263
|
|
|
users = user_service.find_users(user_ids, include_avatars=True) |
264
|
|
|
|
265
|
|
|
# Each of these users is an organizer. |
266
|
|
|
users = {dataclasses.replace(u, is_orga=True) for u in users} |
267
|
|
|
|
268
|
|
|
return {user.id: user for user in users} |
269
|
|
|
|
270
|
|
|
|
271
|
1 |
|
def has_team_memberships(team_id: OrgaTeamID) -> bool: |
272
|
|
|
"""Return `True` if the team has memberships.""" |
273
|
|
|
return db.session \ |
274
|
|
|
.query( |
275
|
|
|
db.session |
276
|
|
|
.query(DbMembership) |
277
|
|
|
.filter(DbMembership.orga_team_id == team_id) |
278
|
|
|
.exists() |
279
|
|
|
) \ |
280
|
|
|
.scalar() |
281
|
|
|
|
282
|
|
|
|
283
|
1 |
|
def _db_entity_to_membership(membership: DbMembership) -> Membership: |
284
|
|
|
return Membership( |
285
|
|
|
membership.id, |
286
|
|
|
membership.orga_team_id, |
287
|
|
|
membership.user_id, |
288
|
|
|
membership.duties, |
289
|
|
|
) |
290
|
|
|
|
291
|
|
|
|
292
|
|
|
# -------------------------------------------------------------------- # |
293
|
|
|
# organizers |
294
|
|
|
|
295
|
|
|
|
296
|
1 |
|
def get_unassigned_orgas_for_party(party_id: PartyID) -> Set[User]: |
297
|
|
|
"""Return organizers that are not assigned to a team for the party.""" |
298
|
|
|
party = party_service.get_party(party_id) |
299
|
|
|
|
300
|
|
|
assigned_orgas = DbUser.query \ |
301
|
|
|
.join(DbMembership) \ |
302
|
|
|
.join(DbOrgaTeam) \ |
303
|
|
|
.filter(DbOrgaTeam.party_id == party.id) \ |
304
|
|
|
.options(db.load_only(DbUser.id)) \ |
305
|
|
|
.all() |
306
|
|
|
assigned_orga_ids = frozenset(user.id for user in assigned_orgas) |
307
|
|
|
|
308
|
|
|
unassigned_orga_ids_query = db.session \ |
309
|
|
|
.query(DbUser.id) |
310
|
|
|
|
311
|
|
|
if assigned_orga_ids: |
312
|
|
|
unassigned_orga_ids_query = unassigned_orga_ids_query \ |
313
|
|
|
.filter(db.not_(DbUser.id.in_(assigned_orga_ids))) |
314
|
|
|
|
315
|
|
|
unassigned_orga_ids = unassigned_orga_ids_query \ |
316
|
|
|
.filter_by(deleted=False) \ |
317
|
|
|
.join(DbOrgaFlag).filter(DbOrgaFlag.brand_id == party.brand_id) \ |
318
|
|
|
.all() |
319
|
|
|
|
320
|
|
|
return user_service.find_users(unassigned_orga_ids) |
321
|
|
|
|
322
|
|
|
|
323
|
1 |
|
def is_orga_for_party(user_id: UserID, party_id: PartyID) -> bool: |
324
|
|
|
"""Return `True` if the user is an organizer (i.e. is member of an |
325
|
|
|
organizer team) of that party. |
326
|
|
|
""" |
327
|
1 |
|
return db.session \ |
328
|
|
|
.query( |
329
|
|
|
db.session |
330
|
|
|
.query(DbMembership) |
331
|
|
|
.filter(DbMembership.user_id == user_id) |
332
|
|
|
.join(DbOrgaTeam) |
333
|
|
|
.filter(DbOrgaTeam.party_id == party_id) |
334
|
|
|
.exists() |
335
|
|
|
) \ |
336
|
|
|
.scalar() |
337
|
|
|
|