Completed
Push — main ( 4a0412...b71682 )
by Jochen
03:11
created

byceps.services.orga_team.service.find_orga_team_for_user_and_party()   A

Complexity

Conditions 1

Size

Total Lines 11
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 7
nop 2
dl 0
loc 11
ccs 2
cts 2
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
"""
2
byceps.services.orga_team.service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2020 Jochen Kupperschmidt
6
:License: Modified BSD, see LICENSE for details.
7
"""
8
9 1
import dataclasses
10 1
from typing import Dict, Optional, Sequence, Set, Tuple
11
12 1
from ...database import db
13 1
from ...typing import PartyID, UserID
14
15 1
from ..orga.models import OrgaFlag as DbOrgaFlag
16 1
from ..party import service as party_service
17 1
from ..user.models.user import User as DbUser
18 1
from ..user import service as user_service
19 1
from ..user.transfer.models import User
20
21 1
from .models import Membership as DbMembership, OrgaTeam as DbOrgaTeam
22 1
from .transfer.models import (
23
    Member,
24
    Membership,
25
    MembershipID,
26
    OrgaActivity,
27
    OrgaTeam,
28
    OrgaTeamID,
29
    PublicOrga,
30
)
31
32
33
# -------------------------------------------------------------------- #
34
# teams
35
36
37 1
def create_team(party_id: PartyID, title: str) -> OrgaTeam:
38
    """Create an orga team for that party."""
39
    team = DbOrgaTeam(party_id, title)
40
41
    db.session.add(team)
42
    db.session.commit()
43
44
    return _db_entity_to_team(team)
45
46
47 1
def delete_team(team_id: OrgaTeamID) -> None:
48
    """Delete the orga team."""
49
    db.session.query(DbOrgaTeam) \
50
        .filter_by(id=team_id) \
51
        .delete()
52
53
    db.session.commit()
54
55
56 1
def count_teams_for_party(party_id: PartyID) -> int:
57
    """Return the number of orga teams for that party."""
58
    return DbOrgaTeam.query \
59
        .filter_by(party_id=party_id) \
60
        .count()
61
62
63 1
def get_teams_for_party(party_id: PartyID) -> Set[OrgaTeam]:
64
    """Return orga teams for that party."""
65
    teams = DbOrgaTeam.query \
66
        .filter_by(party_id=party_id) \
67
        .all()
68
69
    return {_db_entity_to_team(team) for team in teams}
70
71
72 1
def find_team(team_id: OrgaTeamID) -> Optional[OrgaTeam]:
73
    """Return the team with that id, or `None` if not found."""
74
    team = _find_db_team(team_id)
75
76
    if team is None:
77
        return None
78
79
    return _db_entity_to_team(team)
80
81
82 1
def _find_db_team(team_id: OrgaTeamID) -> Optional[DbOrgaTeam]:
83
    """Return the team with that id, or `None` if not found."""
84
    return DbOrgaTeam.query.get(team_id)
85
86
87 1
def get_teams_and_members_for_party(
88
    party_id: PartyID,
89
) -> Sequence[Tuple[OrgaTeam, Set[Member]]]:
90
    """Return all orga teams and their corresponding memberships for
91
    that party.
92
    """
93
94
    teams = DbOrgaTeam.query \
95
        .options(db.joinedload('memberships')) \
96
        .filter_by(party_id=party_id) \
97
        .all()
98
99
    user_ids = {ms.user_id for team in teams for ms in team.memberships}
100
    users = user_service.find_users(user_ids, include_avatars=True)
101
    users_by_id = {user.id: user for user in users}
102
103
    def to_member(db_membership: DbMembership) -> Member:
104
        membership = _db_entity_to_membership(db_membership)
105
        user = users_by_id[membership.user_id]
106
107
        return Member(membership, user)
108
109
    return [
110
        (_db_entity_to_team(team), {to_member(ms) for ms in team.memberships})
111
        for team in teams
112
    ]
113
114
115 1
def _db_entity_to_team(team: DbOrgaTeam) -> OrgaTeam:
116
    return OrgaTeam(
117
        team.id,
118
        team.party_id,
119
        team.title,
120
    )
121
122
123
# -------------------------------------------------------------------- #
124
# memberships
125
126
127 1
def create_membership(
128
    team_id: OrgaTeamID, user_id: UserID, duties: str
129
) -> Membership:
130
    """Assign the user to the team."""
131
    membership = DbMembership(team_id, user_id)
132
133
    if duties:
134
        membership.duties = duties
135
136
    db.session.add(membership)
137
    db.session.commit()
138
139
    return _db_entity_to_membership(membership)
140
141
142 1
def update_membership(
143
    membership_id: MembershipID, team_id: OrgaTeamID, duties: str
144
) -> None:
145
    """Update the membership."""
146
    membership = _find_db_membership(membership_id)
147
    if membership is None:
148
        raise ValueError(f"Unknown membership ID '{membership_id}'")
149
150
    team = _find_db_team(team_id)
151
    if team is None:
152
        raise ValueError(f"Unknown team ID '{team_id}'")
153
154
    membership.orga_team = team
155
    membership.duties = duties
156
    db.session.commit()
157
158
159 1
def delete_membership(membership_id: MembershipID) -> None:
160
    """Delete the membership."""
161
    db.session.query(DbMembership) \
162
        .filter_by(id=membership_id) \
163
        .delete()
164
165
    db.session.commit()
166
167
168 1
def count_memberships_for_party(party_id: PartyID) -> int:
169
    """Return the number of memberships the party's teams have in total."""
170
    return DbMembership.query \
171
        .for_party(party_id) \
172
        .count()
173
174
175 1
def find_membership(membership_id: MembershipID) -> Optional[Membership]:
176
    """Return the membership with that id, or `None` if not found."""
177
    membership = _find_db_membership(membership_id)
178
179
    if membership is None:
180
        return None
181
182
    return _db_entity_to_membership(membership)
183
184
185 1
def _find_db_membership(membership_id: MembershipID) -> Optional[DbMembership]:
186
    """Return the membership with that id, or `None` if not found."""
187
    return DbMembership.query.get(membership_id)
188
189
190 1
def find_orga_team_for_user_and_party(
191
    user_id: UserID, party_id: PartyID
192
) -> Optional[OrgaTeam]:
193
    """Return the user's membership in an orga team of that party, or
194
    `None` of user it not part of an orga team for that party.
195
    """
196 1
    return DbOrgaTeam.query \
197
        .join(DbMembership) \
198
        .filter(DbMembership.user_id == user_id) \
199
        .filter(DbOrgaTeam.party_id == party_id) \
200
        .one_or_none()
201
202
203 1
def get_orga_activities_for_user(user_id: UserID) -> Set[OrgaActivity]:
204
    """Return all orga team activities for that user."""
205
    memberships = DbMembership.query \
206
        .options(
207
            db.joinedload('orga_team').joinedload('party'),
208
        ) \
209
        .filter_by(user_id=user_id) \
210
        .all()
211
212
    def to_activity(membership: DbMembership) -> OrgaActivity:
213
        party = party_service._db_entity_to_party(membership.orga_team.party)
214
        team = _db_entity_to_team(membership.orga_team)
215
216
        return OrgaActivity(
217
            membership.user_id,
218
            party,
219
            team,
220
            membership.duties,
221
        )
222
223
    return {to_activity(ms) for ms in memberships}
224
225
226 1
def get_public_orgas_for_party(party_id: PartyID) -> Set[PublicOrga]:
227
    """Return all public orgas for that party."""
228
    memberships = DbMembership.query \
229
        .for_party(party_id) \
230
        .options(
231
            db.joinedload('orga_team'),
232
            db.joinedload('user')
233
                .load_only('id'),
234
            db.joinedload('user')
235
                .joinedload('detail')
236
                .load_only('first_names', 'last_name'),
237
        ) \
238
        .all()
239
240
    users_by_id = _get_public_orga_users_by_id(memberships)
241
242
    def to_orga(membership: DbMembership) -> PublicOrga:
243
        user = users_by_id[membership.user_id]
244
245
        return PublicOrga(
246
            user,
247
            membership.user.detail.full_name,
248
            membership.orga_team.title,
249
            membership.duties,
250
        )
251
252
    orgas = {to_orga(ms) for ms in memberships}
253
    orgas = {orga for orga in orgas if not orga.user.deleted}
254
255
    return orgas
256
257
258 1
def _get_public_orga_users_by_id(
259
    memberships: DbMembership,
260
) -> Dict[UserID, User]:
261
    user_ids = {ms.user_id for ms in memberships}
262
263
    users = user_service.find_users(user_ids, include_avatars=True)
264
265
    # Each of these users is an organizer.
266
    users = {dataclasses.replace(u, is_orga=True) for u in users}
267
268
    return {user.id: user for user in users}
269
270
271 1
def has_team_memberships(team_id: OrgaTeamID) -> bool:
272
    """Return `True` if the team has memberships."""
273
    return db.session \
274
        .query(
275
            db.session
276
                .query(DbMembership)
277
                .filter(DbMembership.orga_team_id == team_id)
278
                .exists()
279
        ) \
280
        .scalar()
281
282
283 1
def _db_entity_to_membership(membership: DbMembership) -> Membership:
284
    return Membership(
285
        membership.id,
286
        membership.orga_team_id,
287
        membership.user_id,
288
        membership.duties,
289
    )
290
291
292
# -------------------------------------------------------------------- #
293
# organizers
294
295
296 1
def get_unassigned_orgas_for_party(party_id: PartyID) -> Set[User]:
297
    """Return organizers that are not assigned to a team for the party."""
298
    party = party_service.get_party(party_id)
299
300
    assigned_orgas = DbUser.query \
301
        .join(DbMembership) \
302
        .join(DbOrgaTeam) \
303
        .filter(DbOrgaTeam.party_id == party.id) \
304
        .options(db.load_only(DbUser.id)) \
305
        .all()
306
    assigned_orga_ids = frozenset(user.id for user in assigned_orgas)
307
308
    unassigned_orga_ids_query = db.session \
309
        .query(DbUser.id)
310
311
    if assigned_orga_ids:
312
        unassigned_orga_ids_query = unassigned_orga_ids_query \
313
            .filter(db.not_(DbUser.id.in_(assigned_orga_ids)))
314
315
    unassigned_orga_ids = unassigned_orga_ids_query \
316
        .filter_by(deleted=False) \
317
        .join(DbOrgaFlag).filter(DbOrgaFlag.brand_id == party.brand_id) \
318
        .all()
319
320
    return user_service.find_users(unassigned_orga_ids)
321
322
323 1
def is_orga_for_party(user_id: UserID, party_id: PartyID) -> bool:
324
    """Return `True` if the user is an organizer (i.e. is member of an
325
    organizer team) of that party.
326
    """
327 1
    return db.session \
328
        .query(
329
            db.session
330
                .query(DbMembership)
331
                .filter(DbMembership.user_id == user_id)
332
                .join(DbOrgaTeam)
333
                .filter(DbOrgaTeam.party_id == party_id)
334
                .exists()
335
        ) \
336
        .scalar()
337