Passed
Push — main ( bbdf0c...72e25d )
by Jochen
01:31
created

is_server_name_available()   A

Complexity

Conditions 1

Size

Total Lines 6
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 5
nop 1
dl 0
loc 6
ccs 1
cts 1
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
"""
2
byceps.services.site.site_service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2014-2024 Jochen Kupperschmidt
6
:License: Revised BSD (see `LICENSE` file for details)
7
"""
8
9 1
from collections.abc import Callable
10
import dataclasses
11 1
12 1
from sqlalchemy import delete, select
13
14 1
from byceps.database import db
15
from byceps.services.board.models import BoardID
16 1
from byceps.services.brand import brand_service
17 1
from byceps.services.brand.models import BrandID
18 1
from byceps.services.news import news_channel_service
19 1
from byceps.services.news.models import NewsChannelID
20 1
from byceps.services.party.models import PartyID
21 1
from byceps.services.shop.storefront.models import StorefrontID
22 1
23 1
from .dbmodels import DbSite, DbSiteSetting
24
from .models import Site, SiteID, SiteWithBrand
25 1
26 1
27
class UnknownSiteIdError(Exception):
28
    pass
29 1
30 1
31
def create_site(
32
    site_id: SiteID,
33 1
    title: str,
34
    server_name: str,
35
    brand_id: BrandID,
36
    *,
37
    enabled: bool = False,
38
    user_account_creation_enabled: bool = False,
39
    login_enabled: bool = False,
40
    party_id: PartyID | None = None,
41
    board_id: BoardID | None = None,
42
    storefront_id: StorefrontID | None = None,
43
    is_intranet: bool = False,
44
    check_in_on_login: bool = False,
45
) -> Site:
46
    """Create a site for that party."""
47
    db_site = DbSite(
48
        site_id,
49 1
        title,
50
        server_name,
51
        brand_id,
52
        enabled,
53
        user_account_creation_enabled,
54
        login_enabled,
55
        party_id=party_id,
56
        board_id=board_id,
57
        storefront_id=storefront_id,
58
        is_intranet=is_intranet,
59
        check_in_on_login=check_in_on_login,
60
    )
61
62
    db.session.add(db_site)
63
    db.session.commit()
64 1
65 1
    return _db_entity_to_site(db_site)
66
67 1
68
def update_site(
69
    site_id: SiteID,
70 1
    title: str,
71
    server_name: str,
72
    party_id: PartyID | None,
73
    enabled: bool,
74
    user_account_creation_enabled: bool,
75
    login_enabled: bool,
76
    board_id: BoardID | None,
77
    storefront_id: StorefrontID | None,
78
    is_intranet: bool,
79
    check_in_on_login: bool,
80
    archived: bool,
81
) -> Site:
82
    """Update the site."""
83
    db_site = _get_db_site(site_id)
84
85
    db_site.title = title
86
    db_site.server_name = server_name
87
    db_site.party_id = party_id
88
    db_site.enabled = enabled
89
    db_site.user_account_creation_enabled = user_account_creation_enabled
90
    db_site.login_enabled = login_enabled
91
    db_site.board_id = board_id
92
    db_site.storefront_id = storefront_id
93
    db_site.is_intranet = is_intranet
94
    db_site.check_in_on_login = check_in_on_login
95
    db_site.archived = archived
96
97
    db.session.commit()
98
99
    return _db_entity_to_site(db_site)
100
101
102
def delete_site(site_id: SiteID) -> None:
103
    """Delete a site."""
104
    db.session.execute(delete(DbSiteSetting).filter_by(site_id=site_id))
105
    db.session.execute(delete(DbSite).filter_by(id=site_id))
106 1
    db.session.commit()
107
108 1
109 1
def _find_db_site(site_id: SiteID) -> DbSite | None:
110 1
    return db.session.get(DbSite, site_id)
111
112
113 1
def _get_db_site(site_id: SiteID) -> DbSite:
114 1
    db_site = _find_db_site(site_id)
115
116
    if db_site is None:
117 1
        raise UnknownSiteIdError(site_id)
118 1
119
    return db_site
120 1
121
122
def find_site(site_id: SiteID) -> Site | None:
123 1
    """Return the site with that ID, or `None` if not found."""
124
    db_site = _find_db_site(site_id)
125
126 1
    if db_site is None:
127
        return None
128 1
129
    return _db_entity_to_site(db_site)
130 1
131 1
132
def get_site(site_id: SiteID) -> Site:
133 1
    """Return the site with that ID."""
134
    db_site = _get_db_site(site_id)
135
    return _db_entity_to_site(db_site)
136 1
137
138 1
def get_all_sites() -> set[Site]:
139 1
    """Return all sites."""
140
    db_sites = db.session.scalars(select(DbSite)).all()
141
142 1
    return {_db_entity_to_site(db_site) for db_site in db_sites}
143
144 1
145
def get_sites(site_ids: set[SiteID]) -> list[Site]:
146 1
    """Return the sites with those IDs."""
147
    if not site_ids:
148
        return []
149 1
150
    db_sites = db.session.scalars(
151 1
        select(DbSite).filter(DbSite.id.in_(site_ids))
152
    ).all()
153
154 1
    return [_db_entity_to_site(db_site) for db_site in db_sites]
155
156
157
def get_sites_for_brand(brand_id: BrandID) -> set[Site]:
158 1
    """Return the sites for that brand."""
159
    db_sites = db.session.scalars(
160
        select(DbSite).filter_by(brand_id=brand_id)
161 1
    ).all()
162
163 1
    return {_db_entity_to_site(db_site) for db_site in db_sites}
164
165
166
def get_current_sites(
167 1
    brand_id: BrandID | None = None, *, include_brands: bool = False
168
) -> set[Site | SiteWithBrand]:
169
    """Return all "current" (i.e. enabled and not archived) sites."""
170 1
    stmt = select(DbSite)
171
172
    if brand_id is not None:
173
        stmt = stmt.filter_by(brand_id=brand_id)
174 1
175
    if include_brands:
176 1
        stmt = stmt.options(db.joinedload(DbSite.brand))
177 1
178
    stmt = stmt.filter_by(enabled=True).filter_by(archived=False)
179 1
180 1
    db_sites = db.session.scalars(stmt).unique().all()
181
182 1
    transform: Callable[[DbSite], Site | SiteWithBrand]
183
    if include_brands:
184 1
        transform = _db_entity_to_site_with_brand
185
    else:
186
        transform = _db_entity_to_site
187 1
188 1
    return {transform(db_site) for db_site in db_sites}
189
190
191
def is_title_available(title: str) -> bool:
192 1
    """Check if the title is unused."""
193
    return not db.session.scalar(
194
        select(db.exists().where(db.func.lower(DbSite.title) == title.lower()))
195 1
    )
196 1
197
198
def is_server_name_available(server_name: str) -> bool:
199
    """Check if the server name is unused."""
200 1
    return not db.session.scalar(
201
        select(
202
            db.exists().where(
203
                db.func.lower(DbSite.server_name) == server_name.lower()
204
            )
205
        )
206
    )
207
208
209
def _db_entity_to_site(db_site: DbSite) -> Site:
210
    news_channel_ids = frozenset(
211
        channel.id for channel in db_site.news_channels
212
    )
213
214
    return Site(
215
        id=db_site.id,
216
        title=db_site.title,
217
        server_name=db_site.server_name,
218 1
        brand_id=db_site.brand_id,
219 1
        party_id=db_site.party_id,
220 1
        enabled=db_site.enabled,
221
        user_account_creation_enabled=db_site.user_account_creation_enabled,
222 1
        login_enabled=db_site.login_enabled,
223 1
        news_channel_ids=news_channel_ids,
224
        board_id=db_site.board_id,
225 1
        storefront_id=db_site.storefront_id,
226
        is_intranet=db_site.is_intranet,
227
        check_in_on_login=db_site.check_in_on_login,
228 1
        archived=db_site.archived,
229
    )
230 1
231 1
232
def _db_entity_to_site_with_brand(db_site: DbSite) -> SiteWithBrand:
233 1
    site = _db_entity_to_site(db_site)
234 1
    brand = brand_service._db_entity_to_brand(db_site.brand)
235
236
    site_tuple = dataclasses.astuple(site)
237 1
    brand_tuple = (brand,)
238
239
    return SiteWithBrand(*(site_tuple + brand_tuple))
240
241
242
def add_news_channel(site_id: SiteID, news_channel_id: NewsChannelID) -> None:
243
    """Add the news channel to the site."""
244
    db_site = _get_db_site(site_id)
245
    news_channel = news_channel_service.get_db_channel(news_channel_id)
246
247
    if news_channel in db_site.news_channels:
248
        return
249
250
    db_site.news_channels.append(news_channel)
251
    db.session.commit()
252
253
254
def remove_news_channel(
255
    site_id: SiteID, news_channel_id: NewsChannelID
256
) -> None:
257
    """Remove the news channel from the site."""
258
    db_site = _get_db_site(site_id)
259
    news_channel = news_channel_service.get_db_channel(news_channel_id)
260
261
    if news_channel not in db_site.news_channels:
262
        return
263
264
    db_site.news_channels.remove(news_channel)
265
    db.session.commit()
266