1
|
|
|
""" |
2
|
|
|
byceps.services.consent.subject_service |
3
|
|
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
4
|
|
|
|
5
|
|
|
:Copyright: 2006-2020 Jochen Kupperschmidt |
6
|
|
|
:License: Modified BSD, see LICENSE for details. |
7
|
|
|
""" |
8
|
|
|
|
9
|
1 |
|
from typing import Dict, Optional, Set |
10
|
|
|
|
11
|
1 |
|
from ...database import db |
12
|
1 |
|
from ...typing import BrandID |
13
|
|
|
|
14
|
1 |
|
from .models.brand_requirement import BrandRequirement as DbBrandRequirement |
15
|
1 |
|
from .models.consent import Consent as DbConsent |
16
|
1 |
|
from .models.subject import Subject as DbSubject |
17
|
1 |
|
from .transfer.models import Subject, SubjectID |
18
|
|
|
|
19
|
|
|
|
20
|
|
|
# -------------------------------------------------------------------- # |
21
|
|
|
# subjects |
22
|
|
|
|
23
|
|
|
|
24
|
1 |
|
class UnknownSubjectId(ValueError): |
25
|
1 |
|
pass |
26
|
|
|
|
27
|
|
|
|
28
|
1 |
|
def create_subject( |
29
|
|
|
name: str, |
30
|
|
|
title: str, |
31
|
|
|
checkbox_label: str, |
32
|
|
|
checkbox_link_target: Optional[str], |
33
|
|
|
) -> SubjectID: |
34
|
|
|
"""Create a new subject.""" |
35
|
1 |
|
subject = DbSubject(name, title, checkbox_label, checkbox_link_target) |
36
|
|
|
|
37
|
1 |
|
db.session.add(subject) |
38
|
1 |
|
db.session.commit() |
39
|
|
|
|
40
|
1 |
|
return _db_entity_to_subject(subject) |
41
|
|
|
|
42
|
|
|
|
43
|
1 |
|
def get_subjects(subject_ids: Set[SubjectID]) -> Set[Subject]: |
44
|
|
|
"""Return the subjects.""" |
45
|
|
|
rows = DbSubject.query \ |
46
|
|
|
.filter(DbSubject.id.in_(subject_ids)) \ |
47
|
|
|
.all() |
48
|
|
|
|
49
|
|
|
subjects = {_db_entity_to_subject(row) for row in rows} |
50
|
|
|
|
51
|
|
|
_check_for_unknown_subject_ids(subject_ids, subjects) |
52
|
|
|
|
53
|
|
|
return subjects |
54
|
|
|
|
55
|
|
|
|
56
|
1 |
|
def _check_for_unknown_subject_ids( |
57
|
|
|
subject_ids: Set[SubjectID], subjects: Set[Subject] |
58
|
|
|
) -> None: |
59
|
|
|
"""Raise exception on unknown IDs.""" |
60
|
|
|
found_subject_ids = {subject.id for subject in subjects} |
61
|
|
|
unknown_subject_ids = subject_ids.difference(found_subject_ids) |
62
|
|
|
if unknown_subject_ids: |
63
|
|
|
unknown_subject_ids_str = ', '.join(map(str, unknown_subject_ids)) |
64
|
|
|
raise UnknownSubjectId( |
65
|
|
|
f'Unknown subject IDs: {unknown_subject_ids_str}' |
66
|
|
|
) |
67
|
|
|
|
68
|
|
|
|
69
|
1 |
|
def get_subjects_with_consent_counts(*, limit_to_subject_ids: Set[SubjectID]=None) -> Dict[Subject, int]: |
70
|
|
|
"""Return subjects and their consent counts.""" |
71
|
|
|
query = db.session \ |
72
|
|
|
.query( |
73
|
|
|
DbSubject, |
74
|
|
|
db.func.count(DbConsent.user_id) |
75
|
|
|
) \ |
76
|
|
|
.outerjoin(DbConsent) |
77
|
|
|
|
78
|
|
|
if limit_to_subject_ids: |
79
|
|
|
query = query \ |
80
|
|
|
.filter(DbSubject.id.in_(limit_to_subject_ids)) |
81
|
|
|
|
82
|
|
|
rows = query \ |
83
|
|
|
.group_by(DbSubject.id) \ |
84
|
|
|
.all() |
85
|
|
|
|
86
|
|
|
return { |
87
|
|
|
_db_entity_to_subject(subject): consent_count |
88
|
|
|
for subject, consent_count in rows |
89
|
|
|
} |
90
|
|
|
|
91
|
|
|
|
92
|
1 |
|
def _db_entity_to_subject(subject: DbSubject) -> Subject: |
93
|
1 |
|
return Subject( |
94
|
|
|
subject.id, |
95
|
|
|
subject.name, |
96
|
|
|
subject.title, |
97
|
|
|
subject.checkbox_label, |
98
|
|
|
subject.checkbox_link_target, |
99
|
|
|
) |
100
|
|
|
|
101
|
|
|
|
102
|
|
|
# -------------------------------------------------------------------- # |
103
|
|
|
# brand requirements |
104
|
|
|
|
105
|
|
|
|
106
|
1 |
|
def create_brand_requirement(brand_id: BrandID, subject_id: SubjectID) -> None: |
107
|
|
|
"""Create a brand requirement.""" |
108
|
1 |
|
brand_requirement = DbBrandRequirement(brand_id, subject_id) |
109
|
|
|
|
110
|
1 |
|
db.session.add(brand_requirement) |
111
|
1 |
|
db.session.commit() |
112
|
|
|
|
113
|
|
|
|
114
|
1 |
|
def delete_brand_requirement(brand_id: BrandID, subject_id: SubjectID) -> None: |
115
|
|
|
"""Delete a brand requirement.""" |
116
|
1 |
|
db.session.query(DbBrandRequirement) \ |
117
|
|
|
.filter_by(brand_id=brand_id) \ |
118
|
|
|
.filter_by(subject_id=subject_id) \ |
119
|
|
|
.delete() |
120
|
|
|
|
121
|
1 |
|
db.session.commit() |
122
|
|
|
|
123
|
|
|
|
124
|
1 |
|
def get_subject_ids_required_for_brand(brand_id: BrandID) -> Set[SubjectID]: |
125
|
|
|
"""Return the IDs of the subjects required for the brand.""" |
126
|
1 |
|
rows = db.session \ |
127
|
|
|
.query(DbSubject.id) \ |
128
|
|
|
.join(DbBrandRequirement) \ |
129
|
|
|
.filter(DbBrandRequirement.brand_id == brand_id) \ |
130
|
|
|
.all() |
131
|
|
|
|
132
|
1 |
|
return {row[0] for row in rows} |
133
|
|
|
|
134
|
|
|
|
135
|
1 |
|
def get_subjects_required_for_brand(brand_id: BrandID) -> Set[Subject]: |
136
|
|
|
"""Return the subjects required for the brand.""" |
137
|
1 |
|
rows = DbSubject.query \ |
138
|
|
|
.join(DbBrandRequirement) \ |
139
|
|
|
.filter(DbBrandRequirement.brand_id == brand_id) \ |
140
|
|
|
.all() |
141
|
|
|
|
142
|
|
|
return {_db_entity_to_subject(row) for row in rows} |
143
|
|
|
|