Completed
Push — main ( 97c1ff...2a7d5a )
by Jochen
05:40
created

byceps.services.orga_team.service._find_db_team()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.125

Importance

Changes 0
Metric Value
cc 1
eloc 2
dl 0
loc 3
rs 10
c 0
b 0
f 0
nop 1
ccs 1
cts 2
cp 0.5
crap 1.125
1
"""
2
byceps.services.orga_team.service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2020 Jochen Kupperschmidt
6
:License: Modified BSD, see LICENSE for details.
7
"""
8
9 1
from typing import Optional, Sequence, Set
10
11 1
from ...database import db
12 1
from ...typing import PartyID, UserID
13
14 1
from ..orga.models import OrgaFlag as DbOrgaFlag
15 1
from ..party import service as party_service
16 1
from ..user.models.user import User as DbUser
17 1
from ..user import service as user_service
18 1
from ..user.transfer.models import User
19
20 1
from .models import Membership as DbMembership, OrgaTeam as DbOrgaTeam
21 1
from .transfer.models import MembershipID, OrgaTeam, OrgaTeamID
22
23
24
# -------------------------------------------------------------------- #
25
# teams
26
27
28 1
def create_team(party_id: PartyID, title: str) -> OrgaTeam:
29
    """Create an orga team for that party."""
30
    team = DbOrgaTeam(party_id, title)
31
32
    db.session.add(team)
33
    db.session.commit()
34
35
    return _db_entity_to_team(team)
36
37
38 1
def delete_team(team_id: OrgaTeamID) -> None:
39
    """Delete the orga team."""
40
    db.session.query(DbOrgaTeam) \
41
        .filter_by(id=team_id) \
42
        .delete()
43
44
    db.session.commit()
45
46
47 1
def count_teams_for_party(party_id: PartyID) -> int:
48
    """Return the number of orga teams for that party."""
49
    return DbOrgaTeam.query \
50
        .filter_by(party_id=party_id) \
51
        .count()
52
53
54 1
def get_teams_for_party(party_id: PartyID) -> Sequence[DbOrgaTeam]:
55
    """Return orga teams for that party, ordered by title."""
56
    return DbOrgaTeam.query \
57
        .filter_by(party_id=party_id) \
58
        .order_by(DbOrgaTeam.title) \
59
        .all()
60
61
62 1
def find_team(team_id: OrgaTeamID) -> Optional[OrgaTeam]:
63
    """Return the team with that id, or `None` if not found."""
64
    team = _find_db_team(team_id)
65
66
    if team is None:
67
        return None
68
69
    return _db_entity_to_team(team)
70
71
72 1
def _find_db_team(team_id: OrgaTeamID) -> Optional[DbOrgaTeam]:
73
    """Return the team with that id, or `None` if not found."""
74
    return DbOrgaTeam.query.get(team_id)
75
76
77 1
def get_teams_for_party_with_memberships(
78
    party_id: PartyID
79
) -> Sequence[DbOrgaTeam]:
80
    """Return all orga teams for that party, with memberships."""
81
    return DbOrgaTeam.query \
82
        .options(db.joinedload('memberships')) \
83
        .filter_by(party_id=party_id) \
84
        .all()
85
86
87 1
def _db_entity_to_team(team: DbOrgaTeam) -> OrgaTeam:
88
    return OrgaTeam(
89
        team.id,
90
        team.party_id,
91
        team.title,
92
    )
93
94
95
# -------------------------------------------------------------------- #
96
# memberships
97
98
99 1
def create_membership(
100
    team_id: OrgaTeamID, user_id: UserID, duties: str
101
) -> DbMembership:
102
    """Assign the user to the team."""
103
    membership = DbMembership(team_id, user_id)
104
105
    if duties:
106
        membership.duties = duties
107
108
    db.session.add(membership)
109
    db.session.commit()
110
111
    return membership
112
113
114 1
def update_membership(
115
    membership_id: MembershipID, team_id: OrgaTeamID, duties: str
116
) -> None:
117
    """Update the membership."""
118
    membership = _find_db_membership(membership_id)
119
    if membership is None:
120
        raise ValueError(f"Unknown membership ID '{membership_id}'")
121
122
    team = _find_db_team(team_id)
123
    if team is None:
124
        raise ValueError(f"Unknown team ID '{team_id}'")
125
126
    membership.orga_team = team
127
    membership.duties = duties
128
    db.session.commit()
129
130
131 1
def delete_membership(membership_id: MembershipID) -> None:
132
    """Delete the membership."""
133
    db.session.query(DbMembership) \
134
        .filter_by(id=membership_id) \
135
        .delete()
136
137
    db.session.commit()
138
139
140 1
def count_memberships_for_party(party_id: PartyID) -> int:
141
    """Return the number of memberships the party's teams have in total."""
142
    return DbMembership.query \
143
        .for_party(party_id) \
144
        .count()
145
146
147 1
def find_membership(membership_id: MembershipID) -> Optional[DbMembership]:
148
    """Return the membership with that id, or `None` if not found."""
149
    return _find_db_membership(membership_id)
150
151
152 1
def _find_db_membership(membership_id: MembershipID) -> Optional[DbMembership]:
153
    """Return the membership with that id, or `None` if not found."""
154
    return DbMembership.query.get(membership_id)
155
156
157 1
def find_membership_for_party(
158
    user_id: UserID, party_id: PartyID
159
) -> Optional[DbMembership]:
160
    """Return the user's membership in an orga team of that party, or
161
    `None` of user it not part of an orga team for that party.
162
    """
163 1
    return DbMembership.query \
164
        .filter_by(user_id=user_id) \
165
        .for_party(party_id) \
166
        .one_or_none()
167
168
169 1
def get_memberships_for_party(party_id: PartyID) -> Sequence[DbMembership]:
170
    """Return all orga team memberships for that party."""
171
    return DbMembership.query \
172
        .for_party(party_id) \
173
        .options(
174
            db.joinedload('orga_team'),
175
            db.joinedload('user').load_only('id'),
176
            db.joinedload('user').joinedload('detail').load_only('first_names', 'last_name'),
177
        ) \
178
        .all()
179
180
181 1
def get_memberships_for_user(user_id: UserID) -> Sequence[DbMembership]:
182
    """Return all orga team memberships for that user."""
183
    return DbMembership.query \
184
        .options(
185
            db.joinedload('orga_team').joinedload('party'),
186
        ) \
187
        .filter_by(user_id=user_id) \
188
        .all()
189
190
191 1
def has_team_memberships(team_id: OrgaTeamID) -> bool:
192
    """Return `True` if the team has memberships."""
193
    return db.session \
194
        .query(
195
            db.session
196
                .query(DbMembership)
197
                .filter(DbMembership.orga_team_id == team_id)
198
                .exists()
199
        ) \
200
        .scalar()
201
202
203
# -------------------------------------------------------------------- #
204
# organizers
205
206
207 1
def get_unassigned_orgas_for_party(party_id: PartyID) -> Set[User]:
208
    """Return organizers that are not assigned to a team for the party."""
209
    party = party_service.get_party(party_id)
210
211
    assigned_orgas = DbUser.query \
212
        .join(DbMembership) \
213
        .join(DbOrgaTeam) \
214
        .filter(DbOrgaTeam.party_id == party.id) \
215
        .options(db.load_only(DbUser.id)) \
216
        .all()
217
    assigned_orga_ids = frozenset(user.id for user in assigned_orgas)
218
219
    unassigned_orga_ids_query = db.session \
220
        .query(DbUser.id)
221
222
    if assigned_orga_ids:
223
        unassigned_orga_ids_query = unassigned_orga_ids_query \
224
            .filter(db.not_(DbUser.id.in_(assigned_orga_ids)))
225
226
    unassigned_orga_ids = unassigned_orga_ids_query \
227
        .filter_by(deleted=False) \
228
        .join(DbOrgaFlag).filter(DbOrgaFlag.brand_id == party.brand_id) \
229
        .all()
230
231
    return user_service.find_users(unassigned_orga_ids)
232
233
234 1
def is_orga_for_party(user_id: UserID, party_id: PartyID) -> bool:
235
    """Return `True` if the user is an organizer (i.e. is member of an
236
    organizer team) of that party.
237
    """
238 1
    return db.session \
239
        .query(
240
            db.session
241
                .query(DbMembership)
242
                .filter(DbMembership.user_id == user_id)
243
                .join(DbOrgaTeam)
244
                .filter(DbOrgaTeam.party_id == party_id)
245
                .exists()
246
        ) \
247
        .scalar()
248