1
|
|
|
""" |
2
|
|
|
byceps.services.site.site_service |
3
|
|
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
4
|
|
|
|
5
|
|
|
:Copyright: 2014-2024 Jochen Kupperschmidt |
6
|
|
|
:License: Revised BSD (see `LICENSE` file for details) |
7
|
|
|
""" |
8
|
|
|
|
9
|
1 |
|
from collections.abc import Callable |
10
|
|
|
import dataclasses |
11
|
1 |
|
|
12
|
1 |
|
from sqlalchemy import delete, select |
13
|
|
|
|
14
|
1 |
|
from byceps.database import db |
15
|
|
|
from byceps.services.board.models import BoardID |
16
|
1 |
|
from byceps.services.brand import brand_service |
17
|
1 |
|
from byceps.services.brand.models import BrandID |
18
|
1 |
|
from byceps.services.news import news_channel_service |
19
|
1 |
|
from byceps.services.news.models import NewsChannelID |
20
|
1 |
|
from byceps.services.party.models import PartyID |
21
|
1 |
|
from byceps.services.shop.storefront.models import StorefrontID |
22
|
1 |
|
|
23
|
1 |
|
from .dbmodels import DbSite, DbSiteSetting |
24
|
|
|
from .models import Site, SiteID, SiteWithBrand |
25
|
1 |
|
|
26
|
1 |
|
|
27
|
|
|
class UnknownSiteIdError(Exception): |
28
|
|
|
pass |
29
|
1 |
|
|
30
|
1 |
|
|
31
|
|
|
def create_site( |
32
|
|
|
site_id: SiteID, |
33
|
1 |
|
title: str, |
34
|
|
|
server_name: str, |
35
|
|
|
brand_id: BrandID, |
36
|
|
|
*, |
37
|
|
|
enabled: bool = False, |
38
|
|
|
user_account_creation_enabled: bool = False, |
39
|
|
|
login_enabled: bool = False, |
40
|
|
|
party_id: PartyID | None = None, |
41
|
|
|
board_id: BoardID | None = None, |
42
|
|
|
storefront_id: StorefrontID | None = None, |
43
|
|
|
is_intranet: bool = False, |
44
|
|
|
check_in_on_login: bool = False, |
45
|
|
|
) -> Site: |
46
|
|
|
"""Create a site for that party.""" |
47
|
|
|
db_site = DbSite( |
48
|
|
|
site_id, |
49
|
1 |
|
title, |
50
|
|
|
server_name, |
51
|
|
|
brand_id, |
52
|
|
|
enabled, |
53
|
|
|
user_account_creation_enabled, |
54
|
|
|
login_enabled, |
55
|
|
|
party_id=party_id, |
56
|
|
|
board_id=board_id, |
57
|
|
|
storefront_id=storefront_id, |
58
|
|
|
is_intranet=is_intranet, |
59
|
|
|
check_in_on_login=check_in_on_login, |
60
|
|
|
) |
61
|
|
|
|
62
|
|
|
db.session.add(db_site) |
63
|
|
|
db.session.commit() |
64
|
1 |
|
|
65
|
1 |
|
return _db_entity_to_site(db_site) |
66
|
|
|
|
67
|
1 |
|
|
68
|
|
|
def update_site( |
69
|
|
|
site_id: SiteID, |
70
|
1 |
|
title: str, |
71
|
|
|
server_name: str, |
72
|
|
|
party_id: PartyID | None, |
73
|
|
|
enabled: bool, |
74
|
|
|
user_account_creation_enabled: bool, |
75
|
|
|
login_enabled: bool, |
76
|
|
|
board_id: BoardID | None, |
77
|
|
|
storefront_id: StorefrontID | None, |
78
|
|
|
is_intranet: bool, |
79
|
|
|
check_in_on_login: bool, |
80
|
|
|
archived: bool, |
81
|
|
|
) -> Site: |
82
|
|
|
"""Update the site.""" |
83
|
|
|
db_site = _get_db_site(site_id) |
84
|
|
|
|
85
|
|
|
db_site.title = title |
86
|
|
|
db_site.server_name = server_name |
87
|
|
|
db_site.party_id = party_id |
88
|
|
|
db_site.enabled = enabled |
89
|
|
|
db_site.user_account_creation_enabled = user_account_creation_enabled |
90
|
|
|
db_site.login_enabled = login_enabled |
91
|
|
|
db_site.board_id = board_id |
92
|
|
|
db_site.storefront_id = storefront_id |
93
|
|
|
db_site.is_intranet = is_intranet |
94
|
|
|
db_site.check_in_on_login = check_in_on_login |
95
|
|
|
db_site.archived = archived |
96
|
|
|
|
97
|
|
|
db.session.commit() |
98
|
|
|
|
99
|
|
|
return _db_entity_to_site(db_site) |
100
|
|
|
|
101
|
|
|
|
102
|
|
|
def delete_site(site_id: SiteID) -> None: |
103
|
|
|
"""Delete a site.""" |
104
|
|
|
db.session.execute(delete(DbSiteSetting).filter_by(site_id=site_id)) |
105
|
|
|
db.session.execute(delete(DbSite).filter_by(id=site_id)) |
106
|
1 |
|
db.session.commit() |
107
|
|
|
|
108
|
1 |
|
|
109
|
1 |
|
def _find_db_site(site_id: SiteID) -> DbSite | None: |
110
|
1 |
|
return db.session.get(DbSite, site_id) |
111
|
|
|
|
112
|
|
|
|
113
|
1 |
|
def _get_db_site(site_id: SiteID) -> DbSite: |
114
|
1 |
|
db_site = _find_db_site(site_id) |
115
|
|
|
|
116
|
|
|
if db_site is None: |
117
|
1 |
|
raise UnknownSiteIdError(site_id) |
118
|
1 |
|
|
119
|
|
|
return db_site |
120
|
1 |
|
|
121
|
|
|
|
122
|
|
|
def find_site(site_id: SiteID) -> Site | None: |
123
|
1 |
|
"""Return the site with that ID, or `None` if not found.""" |
124
|
|
|
db_site = _find_db_site(site_id) |
125
|
|
|
|
126
|
1 |
|
if db_site is None: |
127
|
|
|
return None |
128
|
1 |
|
|
129
|
|
|
return _db_entity_to_site(db_site) |
130
|
1 |
|
|
131
|
1 |
|
|
132
|
|
|
def get_site(site_id: SiteID) -> Site: |
133
|
1 |
|
"""Return the site with that ID.""" |
134
|
|
|
db_site = _get_db_site(site_id) |
135
|
|
|
return _db_entity_to_site(db_site) |
136
|
1 |
|
|
137
|
|
|
|
138
|
1 |
|
def get_all_sites() -> set[Site]: |
139
|
1 |
|
"""Return all sites.""" |
140
|
|
|
db_sites = db.session.scalars(select(DbSite)).all() |
141
|
|
|
|
142
|
1 |
|
return {_db_entity_to_site(db_site) for db_site in db_sites} |
143
|
|
|
|
144
|
1 |
|
|
145
|
|
|
def get_sites(site_ids: set[SiteID]) -> list[Site]: |
146
|
1 |
|
"""Return the sites with those IDs.""" |
147
|
|
|
if not site_ids: |
148
|
|
|
return [] |
149
|
1 |
|
|
150
|
|
|
db_sites = db.session.scalars( |
151
|
1 |
|
select(DbSite).filter(DbSite.id.in_(site_ids)) |
152
|
|
|
).all() |
153
|
|
|
|
154
|
1 |
|
return [_db_entity_to_site(db_site) for db_site in db_sites] |
155
|
|
|
|
156
|
|
|
|
157
|
|
|
def get_sites_for_brand(brand_id: BrandID) -> set[Site]: |
158
|
1 |
|
"""Return the sites for that brand.""" |
159
|
|
|
db_sites = db.session.scalars( |
160
|
|
|
select(DbSite).filter_by(brand_id=brand_id) |
161
|
1 |
|
).all() |
162
|
|
|
|
163
|
1 |
|
return {_db_entity_to_site(db_site) for db_site in db_sites} |
164
|
|
|
|
165
|
|
|
|
166
|
|
|
def get_current_sites( |
167
|
1 |
|
brand_id: BrandID | None = None, *, include_brands: bool = False |
168
|
|
|
) -> set[Site | SiteWithBrand]: |
169
|
|
|
"""Return all "current" (i.e. enabled and not archived) sites.""" |
170
|
1 |
|
stmt = select(DbSite) |
171
|
|
|
|
172
|
|
|
if brand_id is not None: |
173
|
|
|
stmt = stmt.filter_by(brand_id=brand_id) |
174
|
1 |
|
|
175
|
|
|
if include_brands: |
176
|
1 |
|
stmt = stmt.options(db.joinedload(DbSite.brand)) |
177
|
1 |
|
|
178
|
|
|
stmt = stmt.filter_by(enabled=True).filter_by(archived=False) |
179
|
1 |
|
|
180
|
1 |
|
db_sites = db.session.scalars(stmt).unique().all() |
181
|
|
|
|
182
|
1 |
|
transform: Callable[[DbSite], Site | SiteWithBrand] |
183
|
|
|
if include_brands: |
184
|
1 |
|
transform = _db_entity_to_site_with_brand |
185
|
|
|
else: |
186
|
|
|
transform = _db_entity_to_site |
187
|
1 |
|
|
188
|
1 |
|
return {transform(db_site) for db_site in db_sites} |
189
|
|
|
|
190
|
|
|
|
191
|
|
|
def is_title_available(title: str) -> bool: |
192
|
1 |
|
"""Check if the title is unused.""" |
193
|
|
|
return not db.session.scalar( |
194
|
|
|
select(db.exists().where(db.func.lower(DbSite.title) == title.lower())) |
195
|
1 |
|
) |
196
|
1 |
|
|
197
|
|
|
|
198
|
|
|
def is_server_name_available(server_name: str) -> bool: |
199
|
|
|
"""Check if the server name is unused.""" |
200
|
1 |
|
return not db.session.scalar( |
201
|
|
|
select( |
202
|
|
|
db.exists().where( |
203
|
|
|
db.func.lower(DbSite.server_name) == server_name.lower() |
204
|
|
|
) |
205
|
|
|
) |
206
|
|
|
) |
207
|
|
|
|
208
|
|
|
|
209
|
|
|
def _db_entity_to_site(db_site: DbSite) -> Site: |
210
|
|
|
news_channel_ids = frozenset( |
211
|
|
|
channel.id for channel in db_site.news_channels |
212
|
|
|
) |
213
|
|
|
|
214
|
|
|
return Site( |
215
|
|
|
id=db_site.id, |
216
|
|
|
title=db_site.title, |
217
|
|
|
server_name=db_site.server_name, |
218
|
1 |
|
brand_id=db_site.brand_id, |
219
|
1 |
|
party_id=db_site.party_id, |
220
|
1 |
|
enabled=db_site.enabled, |
221
|
|
|
user_account_creation_enabled=db_site.user_account_creation_enabled, |
222
|
1 |
|
login_enabled=db_site.login_enabled, |
223
|
1 |
|
news_channel_ids=news_channel_ids, |
224
|
|
|
board_id=db_site.board_id, |
225
|
1 |
|
storefront_id=db_site.storefront_id, |
226
|
|
|
is_intranet=db_site.is_intranet, |
227
|
|
|
check_in_on_login=db_site.check_in_on_login, |
228
|
1 |
|
archived=db_site.archived, |
229
|
|
|
) |
230
|
1 |
|
|
231
|
1 |
|
|
232
|
|
|
def _db_entity_to_site_with_brand(db_site: DbSite) -> SiteWithBrand: |
233
|
1 |
|
site = _db_entity_to_site(db_site) |
234
|
1 |
|
brand = brand_service._db_entity_to_brand(db_site.brand) |
235
|
|
|
|
236
|
|
|
site_tuple = dataclasses.astuple(site) |
237
|
1 |
|
brand_tuple = (brand,) |
238
|
|
|
|
239
|
|
|
return SiteWithBrand(*(site_tuple + brand_tuple)) |
240
|
|
|
|
241
|
|
|
|
242
|
|
|
def add_news_channel(site_id: SiteID, news_channel_id: NewsChannelID) -> None: |
243
|
|
|
"""Add the news channel to the site.""" |
244
|
|
|
db_site = _get_db_site(site_id) |
245
|
|
|
news_channel = news_channel_service.get_db_channel(news_channel_id) |
246
|
|
|
|
247
|
|
|
if news_channel in db_site.news_channels: |
248
|
|
|
return |
249
|
|
|
|
250
|
|
|
db_site.news_channels.append(news_channel) |
251
|
|
|
db.session.commit() |
252
|
|
|
|
253
|
|
|
|
254
|
|
|
def remove_news_channel( |
255
|
|
|
site_id: SiteID, news_channel_id: NewsChannelID |
256
|
|
|
) -> None: |
257
|
|
|
"""Remove the news channel from the site.""" |
258
|
|
|
db_site = _get_db_site(site_id) |
259
|
|
|
news_channel = news_channel_service.get_db_channel(news_channel_id) |
260
|
|
|
|
261
|
|
|
if news_channel not in db_site.news_channels: |
262
|
|
|
return |
263
|
|
|
|
264
|
|
|
db_site.news_channels.remove(news_channel) |
265
|
|
|
db.session.commit() |
266
|
|
|
|