Completed
Push — main ( 8b0189...d431d8 )
by Jochen
03:33
created

delete_brand_requirement()   A

Complexity

Conditions 1

Size

Total Lines 8
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 6
nop 2
dl 0
loc 8
ccs 3
cts 3
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
"""
2
byceps.services.consent.subject_service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2020 Jochen Kupperschmidt
6
:License: Modified BSD, see LICENSE for details.
7
"""
8
9 1
from typing import Dict, Optional, Set
10
11 1
from ...database import db
12 1
from ...typing import BrandID
13
14 1
from .models.brand_requirement import BrandRequirement as DbBrandRequirement
15 1
from .models.consent import Consent as DbConsent
16 1
from .models.subject import Subject as DbSubject
17 1
from .transfer.models import Subject, SubjectID
18
19
20
# -------------------------------------------------------------------- #
21
# subjects
22
23
24 1
class UnknownSubjectId(ValueError):
25 1
    pass
26
27
28 1
def create_subject(
29
    name: str,
30
    title: str,
31
    type_: str,
32
    checkbox_label: str,
33
    checkbox_link_target: Optional[str],
34
) -> SubjectID:
35
    """Create a new subject."""
36 1
    subject = DbSubject(
37
        name, title, type_, checkbox_label, checkbox_link_target
38
    )
39
40 1
    db.session.add(subject)
41 1
    db.session.commit()
42
43 1
    return _db_entity_to_subject(subject)
44
45
46 1
def get_subjects(subject_ids: Set[SubjectID]) -> Set[Subject]:
47
    """Return the subjects."""
48
    rows = DbSubject.query \
49
        .filter(DbSubject.id.in_(subject_ids)) \
50
        .all()
51
52
    subjects = {_db_entity_to_subject(row) for row in rows}
53
54
    _check_for_unknown_subject_ids(subject_ids, subjects)
55
56
    return subjects
57
58
59 1
def _check_for_unknown_subject_ids(
60
    subject_ids: Set[SubjectID], subjects: Set[Subject]
61
) -> None:
62
    """Raise exception on unknown IDs."""
63
    found_subject_ids = {subject.id for subject in subjects}
64
    unknown_subject_ids = subject_ids.difference(found_subject_ids)
65
    if unknown_subject_ids:
66
        unknown_subject_ids_str = ', '.join(map(str, unknown_subject_ids))
67
        raise UnknownSubjectId(
68
            f'Unknown subject IDs: {unknown_subject_ids_str}'
69
        )
70
71
72 1
def get_subjects_with_consent_counts() -> Dict[Subject, int]:
73
    """Return all subjects."""
74
    rows = db.session \
75
        .query(
76
            DbSubject,
77
            db.func.count(DbConsent.user_id)
78
        ) \
79
        .outerjoin(DbConsent) \
80
        .group_by(DbSubject.id) \
81
        .all()
82
83
    return {
84
        _db_entity_to_subject(subject): consent_count
85
        for subject, consent_count in rows
86
    }
87
88
89 1
def _db_entity_to_subject(subject: DbSubject) -> Subject:
90 1
    return Subject(
91
        subject.id,
92
        subject.name,
93
        subject.title,
94
        subject.type_,
95
        subject.checkbox_label,
96
        subject.checkbox_link_target,
97
    )
98
99
100
# -------------------------------------------------------------------- #
101
# brand requirements
102
103
104 1
def create_brand_requirement(brand_id: BrandID, subject_id: SubjectID) -> None:
105
    """Create a brand requirement."""
106 1
    brand_requirement = DbBrandRequirement(brand_id, subject_id)
107
108 1
    db.session.add(brand_requirement)
109 1
    db.session.commit()
110
111
112 1
def delete_brand_requirement(brand_id: BrandID, subject_id: SubjectID) -> None:
113
    """Delete a brand requirement."""
114 1
    db.session.query(DbBrandRequirement) \
115
        .filter_by(brand_id=brand_id) \
116
        .filter_by(subject_id=subject_id) \
117
        .delete()
118
119 1
    db.session.commit()
120
121
122 1
def get_subjects_required_for_brand(brand_id: BrandID) -> Set[Subject]:
123
    """Return the subjects required for the brand."""
124 1
    rows = DbSubject.query \
125
        .join(DbBrandRequirement) \
126
        .filter(DbBrandRequirement.brand_id == brand_id) \
127
        .all()
128
129
    return {_db_entity_to_subject(row) for row in rows}
130