Passed
Push — main ( a0d2b9...52c0ea )
by Jochen
04:32
created

byceps.services.site.service.get_sites()   A

Complexity

Conditions 2

Size

Total Lines 11
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 2.032

Importance

Changes 0
Metric Value
cc 2
eloc 8
nop 1
dl 0
loc 11
ccs 4
cts 5
cp 0.8
crap 2.032
rs 10
c 0
b 0
f 0
1
"""
2
byceps.services.site.service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2022 Jochen Kupperschmidt
6
:License: Revised BSD (see `LICENSE` file for details)
7
"""
8
9 1
from __future__ import annotations
10 1
import dataclasses
11 1
from typing import Optional, Union
12
13 1
from sqlalchemy import select
14
15 1
from ...database import db
16 1
from ...typing import BrandID, PartyID
17
18 1
from ..board.transfer.models import BoardID
19 1
from ..brand import service as brand_service
20 1
from ..news import channel_service as news_channel_service
21 1
from ..news.transfer.models import ChannelID as NewsChannelID
22 1
from ..shop.storefront.transfer.models import StorefrontID
23
24 1
from .dbmodels.site import Site as DbSite
25 1
from .dbmodels.setting import Setting as DbSetting
26 1
from .transfer.models import Site, SiteID, SiteWithBrand
27
28
29 1
class UnknownSiteId(Exception):
30 1
    pass
31
32
33 1
def create_site(
34
    site_id: SiteID,
35
    title: str,
36
    server_name: str,
37
    brand_id: BrandID,
38
    *,
39
    enabled: bool = False,
40
    user_account_creation_enabled: bool = False,
41
    login_enabled: bool = False,
42
    party_id: Optional[PartyID] = None,
43
    board_id: Optional[BoardID] = None,
44
    storefront_id: Optional[StorefrontID] = None,
45
) -> Site:
46
    """Create a site for that party."""
47 1
    site = DbSite(
48
        site_id,
49
        title,
50
        server_name,
51
        brand_id,
52
        enabled,
53
        user_account_creation_enabled,
54
        login_enabled,
55
        party_id=party_id,
56
        board_id=board_id,
57
        storefront_id=storefront_id,
58
    )
59
60 1
    db.session.add(site)
61 1
    db.session.commit()
62
63 1
    return _db_entity_to_site(site)
64
65
66 1
def update_site(
67
    site_id: SiteID,
68
    title: str,
69
    server_name: str,
70
    brand_id: BrandID,
71
    party_id: Optional[PartyID],
72
    enabled: bool,
73
    user_account_creation_enabled: bool,
74
    login_enabled: bool,
75
    board_id: Optional[BoardID],
76
    storefront_id: Optional[StorefrontID],
77
    archived: bool,
78
) -> Site:
79
    """Update the site."""
80
    site = _get_db_site(site_id)
81
82
    site.title = title
83
    site.server_name = server_name
84
    site.brand_id = brand_id
85
    site.party_id = party_id
86
    site.enabled = enabled
87
    site.user_account_creation_enabled = user_account_creation_enabled
88
    site.login_enabled = login_enabled
89
    site.board_id = board_id
90
    site.storefront_id = storefront_id
91
    site.archived = archived
92
93
    db.session.commit()
94
95
    return _db_entity_to_site(site)
96
97
98 1
def delete_site(site_id: SiteID) -> None:
99
    """Delete a site."""
100 1
    db.session.query(DbSetting) \
101
        .filter_by(site_id=site_id) \
102
        .delete()
103
104 1
    db.session.query(DbSite) \
105
        .filter_by(id=site_id) \
106
        .delete()
107
108 1
    db.session.commit()
109
110
111 1
def _find_db_site(site_id: SiteID) -> Optional[DbSite]:
112 1
    return db.session.get(DbSite, site_id)
113
114
115 1
def _get_db_site(site_id: SiteID) -> DbSite:
116 1
    site = _find_db_site(site_id)
117
118 1
    if site is None:
119
        raise UnknownSiteId(site_id)
120
121 1
    return site
122
123
124 1
def find_site(site_id: SiteID) -> Optional[Site]:
125
    """Return the site with that ID, or `None` if not found."""
126 1
    site = _find_db_site(site_id)
127
128 1
    if site is None:
129 1
        return None
130
131 1
    return _db_entity_to_site(site)
132
133
134 1
def get_site(site_id: SiteID) -> Site:
135
    """Return the site with that ID."""
136 1
    site = _get_db_site(site_id)
137 1
    return _db_entity_to_site(site)
138
139
140 1
def get_all_sites() -> set[Site]:
141
    """Return all sites."""
142 1
    sites = db.session.query(DbSite).all()
143
144 1
    return {_db_entity_to_site(site) for site in sites}
145
146
147 1
def get_sites(site_ids: set[SiteID]) -> list[Site]:
148
    """Return the sites with those IDs."""
149 1
    if not site_ids:
150
        return []
151
152 1
    db_sites = db.session.execute(
153
        select(DbSite)
154
        .filter(DbSite.id.in_(site_ids))
155
    ).scalars().all()
156
157 1
    return [_db_entity_to_site(db_site) for db_site in db_sites]
158
159
160 1
def get_sites_for_brand(brand_id: BrandID) -> set[Site]:
161
    """Return the sites for that brand."""
162 1
    sites = db.session \
163
        .query(DbSite) \
164
        .filter_by(brand_id=brand_id) \
165
        .all()
166
167 1
    return {_db_entity_to_site(site) for site in sites}
168
169
170 1
def get_current_sites(
171
    brand_id: Optional[BrandID] = None, *, include_brands: bool = False
172
) -> set[Union[Site, SiteWithBrand]]:
173
    """Return all "current" (i.e. enabled and not archived) sites."""
174 1
    query = db.session.query(DbSite)
175
176 1
    if brand_id is not None:
177 1
        query = query.filter_by(brand_id=brand_id)
178
179 1
    if include_brands:
180 1
        query = query.options(db.joinedload(DbSite.brand))
181
182 1
    sites = query \
183
        .filter_by(enabled=True) \
184
        .filter_by(archived=False) \
185
        .all()
186
187 1
    if include_brands:
188 1
        transform = _db_entity_to_site_with_brand
189
    else:
190
        transform = _db_entity_to_site
191
192 1
    return {transform(site) for site in sites}
193
194
195 1
def _db_entity_to_site(site: DbSite) -> Site:
196 1
    news_channel_ids = frozenset(channel.id for channel in site.news_channels)
197
198 1
    return Site(
199
        id=site.id,
200
        title=site.title,
201
        server_name=site.server_name,
202
        brand_id=site.brand_id,
203
        party_id=site.party_id,
204
        enabled=site.enabled,
205
        user_account_creation_enabled=site.user_account_creation_enabled,
206
        login_enabled=site.login_enabled,
207
        news_channel_ids=news_channel_ids,
208
        board_id=site.board_id,
209
        storefront_id=site.storefront_id,
210
        archived=site.archived,
211
    )
212
213
214 1
def _db_entity_to_site_with_brand(site_entity: DbSite) -> SiteWithBrand:
215 1
    site = _db_entity_to_site(site_entity)
216 1
    brand = brand_service._db_entity_to_brand(site_entity.brand)
217
218 1
    site_tuple = dataclasses.astuple(site)
219 1
    brand_tuple = (brand,)
220
221 1
    return SiteWithBrand(*(site_tuple + brand_tuple))
222
223
224 1
def add_news_channel(site_id: SiteID, news_channel_id: NewsChannelID) -> None:
225
    """Add the news channel to the site."""
226 1
    site = _get_db_site(site_id)
227 1
    news_channel = news_channel_service.get_db_channel(news_channel_id)
228
229 1
    site.news_channels.append(news_channel)
230 1
    db.session.commit()
231
232
233 1
def remove_news_channel(
234
    site_id: SiteID, news_channel_id: NewsChannelID
235
) -> None:
236
    """Remove the news channel from the site."""
237 1
    site = _get_db_site(site_id)
238 1
    news_channel = news_channel_service.get_db_channel(news_channel_id)
239
240 1
    site.news_channels.remove(news_channel)
241
    db.session.commit()
242