Passed
Push — main ( 0b198e...cd8d81 )
by Jochen
05:10
created

byceps.services.orga_team.service.create_team()   A

Complexity

Conditions 1

Size

Total Lines 8
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 5
nop 2
dl 0
loc 8
ccs 5
cts 5
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
"""
2
byceps.services.orga_team.service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2021 Jochen Kupperschmidt
6
:License: Revised BSD (see `LICENSE` file for details)
7
"""
8
9 1
from __future__ import annotations
10 1
from itertools import groupby
11 1
from typing import Iterable, Optional, Sequence
12
13 1
from sqlalchemy import select
14
15 1
from ...database import db
16 1
from ...typing import PartyID, UserID
17
18 1
from ..orga.dbmodels import OrgaFlag as DbOrgaFlag
19 1
from ..party import service as party_service
20 1
from ..party.transfer.models import PartyID
21 1
from ..user.dbmodels.detail import UserDetail as DbUserDetail
22 1
from ..user.dbmodels.user import User as DbUser
23 1
from ..user import service as user_service
24 1
from ..user.transfer.models import User
25
26 1
from .dbmodels import Membership as DbMembership, OrgaTeam as DbOrgaTeam
27 1
from .transfer.models import (
28
    Member,
29
    Membership,
30
    MembershipID,
31
    OrgaActivity,
32
    OrgaTeam,
33
    OrgaTeamID,
34
    PublicOrga,
35
    TeamAndDuties,
36
)
37
38
39
# -------------------------------------------------------------------- #
40
# teams
41
42
43 1
def create_team(party_id: PartyID, title: str) -> OrgaTeam:
44
    """Create an orga team for that party."""
45 1
    team = DbOrgaTeam(party_id, title)
46
47 1
    db.session.add(team)
48 1
    db.session.commit()
49
50 1
    return _db_entity_to_team(team)
51
52
53 1
def delete_team(team_id: OrgaTeamID) -> None:
54
    """Delete the orga team."""
55 1
    db.session.query(DbOrgaTeam) \
56
        .filter_by(id=team_id) \
57
        .delete()
58
59 1
    db.session.commit()
60
61
62 1
def count_teams_for_parties(party_ids: set[PartyID]) -> dict[PartyID, int]:
63
    """Count orga teams for each party."""
64 1
    rows = db.session \
65
        .query(
66
            DbOrgaTeam.party_id,
67
            db.func.count(DbOrgaTeam.id)
68
        ) \
69
        .filter(DbOrgaTeam.party_id.in_(party_ids)) \
70
        .group_by(DbOrgaTeam.party_id) \
71
        .all()
72
73 1
    return dict(rows)
74
75
76 1
def count_teams_for_party(party_id: PartyID) -> int:
77
    """Return the number of orga teams for that party."""
78 1
    return db.session \
79
        .query(DbOrgaTeam) \
80
        .filter_by(party_id=party_id) \
81
        .count()
82
83
84 1
def get_teams_for_party(party_id: PartyID) -> set[OrgaTeam]:
85
    """Return orga teams for that party."""
86 1
    teams = db.session \
87
        .query(DbOrgaTeam) \
88
        .filter_by(party_id=party_id) \
89
        .all()
90
91 1
    return {_db_entity_to_team(team) for team in teams}
92
93
94 1
def find_team(team_id: OrgaTeamID) -> Optional[OrgaTeam]:
95
    """Return the team with that id, or `None` if not found."""
96 1
    team = _find_db_team(team_id)
97
98 1
    if team is None:
99 1
        return None
100
101 1
    return _db_entity_to_team(team)
102
103
104 1
def _find_db_team(team_id: OrgaTeamID) -> Optional[DbOrgaTeam]:
105
    """Return the team with that id, or `None` if not found."""
106 1
    return db.session.query(DbOrgaTeam).get(team_id)
107
108
109 1
def get_teams_and_members_for_party(
110
    party_id: PartyID,
111
) -> Sequence[tuple[OrgaTeam, set[Member]]]:
112
    """Return all orga teams and their corresponding memberships for
113
    that party.
114
    """
115 1
    teams = db.session \
116
        .query(DbOrgaTeam) \
117
        .options(db.joinedload(DbOrgaTeam.memberships)) \
118
        .filter_by(party_id=party_id) \
119
        .all()
120
121 1
    user_ids = {ms.user_id for team in teams for ms in team.memberships}
122 1
    users = user_service.get_users(user_ids, include_avatars=True)
123 1
    users_by_id = user_service.index_users_by_id(users)
124
125 1
    def to_member(db_membership: DbMembership) -> Member:
126
        membership = _db_entity_to_membership(db_membership)
127
        user = users_by_id[membership.user_id]
128
129
        return Member(membership, user)
130
131 1
    return [
132
        (_db_entity_to_team(team), {to_member(ms) for ms in team.memberships})
133
        for team in teams
134
    ]
135
136
137 1
def _db_entity_to_team(team: DbOrgaTeam) -> OrgaTeam:
138 1
    return OrgaTeam(
139
        team.id,
140
        team.party_id,
141
        team.title,
142
    )
143
144
145
# -------------------------------------------------------------------- #
146
# memberships
147
148
149 1
def create_membership(
150
    team_id: OrgaTeamID, user_id: UserID, duties: str
151
) -> Membership:
152
    """Assign the user to the team."""
153 1
    membership = DbMembership(team_id, user_id)
154
155 1
    if duties:
156 1
        membership.duties = duties
157
158 1
    db.session.add(membership)
159 1
    db.session.commit()
160
161 1
    return _db_entity_to_membership(membership)
162
163
164 1
def update_membership(
165
    membership_id: MembershipID, team_id: OrgaTeamID, duties: str
166
) -> Membership:
167
    """Update the membership."""
168 1
    membership = _find_db_membership(membership_id)
169 1
    if membership is None:
170
        raise ValueError(f"Unknown membership ID '{membership_id}'")
171
172 1
    team = _find_db_team(team_id)
173 1
    if team is None:
174
        raise ValueError(f"Unknown team ID '{team_id}'")
175
176 1
    membership.orga_team = team
177 1
    membership.duties = duties
178 1
    db.session.commit()
179
180 1
    return _db_entity_to_membership(membership)
181
182
183 1
def delete_membership(membership_id: MembershipID) -> None:
184
    """Delete the membership."""
185 1
    db.session.query(DbMembership) \
186
        .filter_by(id=membership_id) \
187
        .delete()
188
189 1
    db.session.commit()
190
191
192 1
def count_memberships_for_party(party_id: PartyID) -> int:
193
    """Return the number of memberships the party's teams have in total."""
194 1
    return db.session \
195
        .query(DbMembership) \
196
        .join(DbOrgaTeam) \
197
        .filter(DbOrgaTeam.party_id == party_id) \
198
        .count()
199
200
201 1
def get_memberships_for_party(party_id: PartyID) -> set[Membership]:
202
    """Return memberships for that party."""
203 1
    memberships = db.session \
204
        .query(DbMembership) \
205
        .join(DbOrgaTeam) \
206
        .filter(DbOrgaTeam.party_id == party_id) \
207
        .all()
208
209 1
    return {_db_entity_to_membership(membership) for membership in memberships}
210
211
212 1
def find_membership(membership_id: MembershipID) -> Optional[Membership]:
213
    """Return the membership with that id, or `None` if not found."""
214 1
    membership = _find_db_membership(membership_id)
215
216 1
    if membership is None:
217
        return None
218
219 1
    return _db_entity_to_membership(membership)
220
221
222 1
def _find_db_membership(membership_id: MembershipID) -> Optional[DbMembership]:
223
    """Return the membership with that id, or `None` if not found."""
224 1
    return db.session.query(DbMembership).get(membership_id)
225
226
227 1
def get_orga_teams_for_user_and_party(
228
    user_id: UserID, party_id: PartyID
229
) -> set[OrgaTeam]:
230
    """Return the user's memberships in any orga teams of that party."""
231 1
    db_orga_teams = db.session.execute(
232
        select(DbOrgaTeam)
233
        .join(DbMembership)
234
        .filter(DbMembership.user_id == user_id)
235
        .filter(DbOrgaTeam.party_id == party_id)
236
    ).scalars().all()
237
238 1
    return {_db_entity_to_team(team) for team in db_orga_teams}
239
240
241 1
def get_orga_activities_for_user(user_id: UserID) -> set[OrgaActivity]:
242
    """Return all orga team activities for that user."""
243 1
    memberships = db.session.execute(
244
        select(DbMembership)
245
        .options(db.joinedload(DbMembership.orga_team))
246
        .filter_by(user_id=user_id)
247
    ).scalars().all()
248
249 1
    party_ids = {ms.orga_team.party_id for ms in memberships}
250 1
    parties = party_service.get_parties(party_ids)
251 1
    parties_by_id = {party.id: party for party in parties}
252
253 1
    def to_activity(
254
        user_id: UserID, party_id: PartyID, memberships: Iterable[DbMembership]
255
    ) -> OrgaActivity:
256
        return OrgaActivity(
257
            user_id=user_id,
258
            party=parties_by_id[party_id],
259
            teams_and_duties=to_teams_and_duties(memberships),
260
        )
261
262 1
    def to_teams_and_duties(
263
        memberships: Iterable[DbMembership],
264
    ) -> frozenset[TeamAndDuties]:
265
        return frozenset(
266
            TeamAndDuties(
267
                team_title=ms.orga_team.title,
268
                duties=ms.duties,
269
            ) for ms in memberships
270
        )
271
272 1
    key_func = lambda ms: (ms.user_id, ms.orga_team.party_id)
273 1
    return {
274
        to_activity(user_id, party_id, ms)
275
        for (user_id, party_id), ms in groupby(
276
            sorted(memberships, key=key_func), key=key_func
277
        )
278
    }
279
280
281 1
def get_public_orgas_for_party(party_id: PartyID) -> set[PublicOrga]:
282
    """Return all public orgas for that party."""
283 1
    memberships = db.session \
284
        .query(DbMembership) \
285
        .join(DbOrgaTeam) \
286
        .filter(DbOrgaTeam.party_id == party_id) \
287
        .options(
288
            db.joinedload(DbMembership.orga_team),
289
            db.joinedload(DbMembership.user)
290
                .load_only(DbUser.id),
291
            db.joinedload(DbMembership.user)
292
                .joinedload(DbUser.detail)
293
                .load_only(DbUserDetail.first_names, DbUserDetail.last_name),
294
        ) \
295
        .all()
296
297 1
    users_by_id = _get_public_orga_users_by_id(memberships)
298
299 1
    def to_orga(membership: DbMembership) -> PublicOrga:
300
        user = users_by_id[membership.user_id]
301
302
        return PublicOrga(
303
            user,
304
            membership.user.detail.full_name,
305
            membership.orga_team.title,
306
            membership.duties,
307
        )
308
309 1
    orgas = {to_orga(ms) for ms in memberships}
310 1
    orgas = {orga for orga in orgas if not orga.user.deleted}
311
312 1
    return orgas
313
314
315 1
def _get_public_orga_users_by_id(
316
    memberships: DbMembership,
317
) -> dict[UserID, User]:
318 1
    user_ids = {ms.user_id for ms in memberships}
319 1
    users = user_service.get_users(user_ids, include_avatars=True)
320 1
    return user_service.index_users_by_id(users)
321
322
323 1
def has_team_memberships(team_id: OrgaTeamID) -> bool:
324
    """Return `True` if the team has memberships."""
325 1
    return db.session \
326
        .query(
327
            db.session
328
                .query(DbMembership)
329
                .filter(DbMembership.orga_team_id == team_id)
330
                .exists()
331
        ) \
332
        .scalar()
333
334
335 1
def _db_entity_to_membership(membership: DbMembership) -> Membership:
336 1
    return Membership(
337
        membership.id,
338
        membership.orga_team_id,
339
        membership.user_id,
340
        membership.duties,
341
    )
342
343
344
# -------------------------------------------------------------------- #
345
# copying teams and memberships
346
347
348 1
def copy_teams_and_memberships(
349
    source_party_id: PartyID, target_party_id: PartyID
350
) -> int:
351
    """Copy teams and memberships from one party to another.
352
353
    Return the number of teams.
354
    """
355 1
    source_teams_and_members = get_teams_and_members_for_party(source_party_id)
356
357 1
    for source_team, source_members in source_teams_and_members:
358 1
        target_team = create_team(target_party_id, source_team.title)
359 1
        for source_member in source_members:
360
            create_membership(
361
                target_team.id,
362
                source_member.user.id,
363
                source_member.membership.duties,
364
            )
365
366 1
    return len(source_teams_and_members)
367
368
369
# -------------------------------------------------------------------- #
370
# organizers
371
372
373 1
def get_unassigned_orgas_for_team(team: OrgaTeam) -> set[User]:
374
    """Return eligible organizers that are not assigned to this team."""
375 1
    party = party_service.get_party(team.party_id)
376
377 1
    assigned_orga_ids = set(
378
        db.session.execute(
379
            select(DbMembership.user_id)
380
            .filter_by(orga_team_id=team.id)
381
        ).scalars().all()
382
    )
383
384 1
    unassigned_orga_ids_select = select(DbUser.id)
385
386 1
    if assigned_orga_ids:
387
        unassigned_orga_ids_select = unassigned_orga_ids_select \
388
            .filter(db.not_(DbUser.id.in_(assigned_orga_ids)))
389
390 1
    unassigned_orga_ids = set(
391
        db.session.execute(
392
            unassigned_orga_ids_select
393
            .filter(DbUser.deleted == False)
394
            .join(DbOrgaFlag).filter(DbOrgaFlag.brand_id == party.brand_id)
395
        ).scalars().all()
396
    )
397
398 1
    return user_service.get_users(unassigned_orga_ids)
399
400
401 1
def is_orga_for_party(user_id: UserID, party_id: PartyID) -> bool:
402
    """Return `True` if the user is an organizer (i.e. is member of an
403
    organizer team) of that party.
404
    """
405 1
    return db.session \
406
        .query(
407
            db.session
408
                .query(DbMembership)
409
                .filter(DbMembership.user_id == user_id)
410
                .join(DbOrgaTeam)
411
                .filter(DbOrgaTeam.party_id == party_id)
412
                .exists()
413
        ) \
414
        .scalar()
415
416
417 1
def select_orgas_for_party(
418
    user_ids: set[UserID], party_id: PartyID
419
) -> set[UserID]:
420
    """Return `True` if the user is an organizer (i.e. is member of an
421
    organizer team) of that party.
422
    """
423 1
    if not user_ids:
424 1
        return set()
425
426 1
    rows = db.session.execute(
427
        select(DbMembership.user_id)
428
        .select_from(DbMembership)
429
        .filter(DbMembership.user_id.in_(user_ids))
430
        .join(DbOrgaTeam)
431
        .filter(DbOrgaTeam.party_id == party_id)
432
    ).scalars().all()
433
434
    return set(rows)
435