Passed
Push — main ( b25e28...d471db )
by Jochen
04:48
created

byceps.services.news.service.unpublish_item()   A

Complexity

Conditions 2

Size

Total Lines 13
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 2.0185

Importance

Changes 0
Metric Value
cc 2
eloc 9
nop 3
dl 0
loc 13
ccs 5
cts 6
cp 0.8333
crap 2.0185
rs 9.95
c 0
b 0
f 0
1
"""
2
byceps.services.news.service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2014-2022 Jochen Kupperschmidt
6
:License: Revised BSD (see `LICENSE` file for details)
7
"""
8
9 1
from __future__ import annotations
10 1
import dataclasses
11 1
from datetime import datetime
12 1
from functools import partial
13 1
from typing import Optional, Union
14
15 1
from sqlalchemy import select
16 1
from sqlalchemy.sql import Select
17
18 1
from ...database import db, paginate, Pagination
19 1
from ...events.news import NewsItemPublished
20 1
from ...typing import UserID
21
22 1
from ..site import service as site_service
23 1
from ..site.transfer.models import SiteID
24 1
from ..user import service as user_service
25 1
from ..user.transfer.models import User
26
27 1
from .channel_service import _db_entity_to_channel
28 1
from . import html_service
29 1
from .dbmodels.channel import Channel as DbChannel
30 1
from .dbmodels.item import (
31
    CurrentVersionAssociation as DbCurrentVersionAssociation,
32
    Item as DbItem,
33
    ItemVersion as DbItemVersion,
34
)
35 1
from . import image_service
36 1
from .transfer.models import (
37
    BodyFormat,
38
    ChannelID,
39
    Headline,
40
    ImageID,
41
    Item,
42
    ItemID,
43
    ItemVersionID,
44
)
45
46
47 1
def create_item(
48
    channel_id: ChannelID,
49
    slug: str,
50
    creator_id: UserID,
51
    title: str,
52
    body: str,
53
    body_format: BodyFormat,
54
    *,
55
    image_url_path: Optional[str] = None,
56
) -> Item:
57
    """Create a news item, a version, and set the version as the item's
58
    current one.
59
    """
60 1
    db_item = DbItem(channel_id, slug)
61 1
    db.session.add(db_item)
62
63 1
    db_version = _create_version(
64
        db_item,
65
        creator_id,
66
        title,
67
        body,
68
        body_format,
69
        image_url_path=image_url_path,
70
    )
71 1
    db.session.add(db_version)
72
73 1
    db_current_version_association = DbCurrentVersionAssociation(
74
        db_item, db_version
75
    )
76 1
    db.session.add(db_current_version_association)
77
78 1
    db.session.commit()
79
80 1
    return _db_entity_to_item(db_item)
81
82
83 1
def update_item(
84
    item_id: ItemID,
85
    slug: str,
86
    creator_id: UserID,
87
    title: str,
88
    body: str,
89
    body_format: BodyFormat,
90
    *,
91
    image_url_path: Optional[str] = None,
92
) -> Item:
93
    """Update a news item by creating a new version of it and setting
94
    the new version as the current one.
95
    """
96
    db_item = _get_db_item(item_id)
97
98
    db_item.slug = slug
99
100
    db_version = _create_version(
101
        db_item,
102
        creator_id,
103
        title,
104
        body,
105
        body_format,
106
        image_url_path=image_url_path,
107
    )
108
    db.session.add(db_version)
109
110
    db_item.current_version = db_version
111
112
    db.session.commit()
113
114
    return _db_entity_to_item(db_item)
115
116
117 1
def _create_version(
118
    db_item: DbItem,
119
    creator_id: UserID,
120
    title: str,
121
    body: str,
122
    body_format: BodyFormat,
123
    *,
124
    image_url_path: Optional[str] = None,
125
) -> DbItemVersion:
126 1
    db_version = DbItemVersion(db_item, creator_id, title, body, body_format)
127
128 1
    if image_url_path:
129 1
        db_version.image_url_path = image_url_path
130
131 1
    return db_version
132
133
134 1
def set_featured_image(item_id: ItemID, image_id: ImageID) -> None:
135
    """Set an image as featured image."""
136
    db_item = _get_db_item(item_id)
137
138
    db_item.featured_image_id = image_id
139
    db.session.commit()
140
141
142 1
def publish_item(
143
    item_id: ItemID,
144
    *,
145
    publish_at: Optional[datetime] = None,
146
    initiator_id: Optional[UserID] = None,
147
) -> NewsItemPublished:
148
    """Publish a news item."""
149 1
    db_item = _get_db_item(item_id)
150
151 1
    if db_item.published:
152
        raise ValueError('News item has already been published')
153
154 1
    now = datetime.utcnow()
155 1
    if publish_at is None:
156 1
        publish_at = now
157
158
    initiator: Optional[User]
159 1
    if initiator_id is not None:
160 1
        initiator = user_service.get_user(initiator_id)
161
    else:
162 1
        initiator = None
163
164 1
    db_item.published_at = publish_at
165 1
    db.session.commit()
166
167 1
    item = _db_entity_to_item(db_item)
168
169 1
    if item.channel.announcement_site_id is not None:
170 1
        site = site_service.get_site(SiteID(item.channel.announcement_site_id))
171 1
        external_url = f'https://{site.server_name}/news/{item.slug}'
172
    else:
173 1
        external_url = None
174
175 1
    return NewsItemPublished(
176
        occurred_at=now,
177
        initiator_id=initiator.id if initiator else None,
178
        initiator_screen_name=initiator.screen_name if initiator else None,
179
        item_id=item.id,
180
        channel_id=item.channel.id,
181
        published_at=item.published_at,
182
        title=item.title,
183
        external_url=external_url,
184
    )
185
186
187 1
def unpublish_item(
188
    item_id: ItemID,
189
    *,
190
    initiator_id: Optional[UserID] = None,
191
) -> NewsItemPublished:
192
    """Unublish a news item."""
193 1
    db_item = _get_db_item(item_id)
194
195 1
    if not db_item.published:
196
        raise ValueError('News item is not published')
197
198 1
    db_item.published_at = None
199 1
    db.session.commit()
200
201
202 1
def delete_item(item_id: ItemID) -> None:
203
    """Delete a news item and its versions."""
204 1
    db.session.query(DbCurrentVersionAssociation) \
205
        .filter_by(item_id=item_id) \
206
        .delete()
207
208 1
    db.session.query(DbItemVersion) \
209
        .filter_by(item_id=item_id) \
210
        .delete()
211
212 1
    db.session.query(DbItem) \
213
        .filter_by(id=item_id) \
214
        .delete()
215
216 1
    db.session.commit()
217
218
219 1
def find_item(item_id: ItemID) -> Optional[Item]:
220
    """Return the item with that id, or `None` if not found."""
221 1
    db_item = _find_db_item(item_id)
222
223 1
    if db_item is None:
224
        return None
225
226 1
    return _db_entity_to_item(db_item)
227
228
229 1
def _find_db_item(item_id: ItemID) -> Optional[DbItem]:
230
    """Return the item with that id, or `None` if not found."""
231 1
    return db.session.query(DbItem) \
232
        .options(
233
            db.joinedload(DbItem.channel),
234
            db.joinedload(DbItem.images)
235
        ) \
236
        .get(item_id)
237
238
239 1
def _get_db_item(item_id: ItemID) -> DbItem:
240
    """Return the item with that id, or raise an exception."""
241 1
    db_item = _find_db_item(item_id)
242
243 1
    if db_item is None:
244
        raise ValueError(f'Unknown news item ID "{item_id}".')
245
246 1
    return db_item
247
248
249 1
def find_aggregated_item_by_slug(
250
    channel_ids: set[ChannelID], slug: str, *, published_only: bool = False
251
) -> Optional[Item]:
252
    """Return the news item identified by that slug in one of the given
253
    channels, or `None` if not found.
254
    """
255 1
    query = db.session \
256
        .query(DbItem) \
257
        .filter(DbItem.channel_id.in_(channel_ids)) \
258
        .options(
259
            db.joinedload(DbItem.channel),
260
            db.joinedload(DbItem.current_version_association)
261
                .joinedload(DbCurrentVersionAssociation.version),
262
            db.joinedload(DbItem.images)
263
        ) \
264
        .filter_by(slug=slug)
265
266 1
    if published_only:
267 1
        query = query.filter(DbItem.published_at <= datetime.utcnow())
268
269 1
    db_item = query.one_or_none()
270
271 1
    if db_item is None:
272 1
        return None
273
274 1
    return _db_entity_to_item(db_item, render_body=True)
275
276
277 1
def get_aggregated_items_paginated(
278
    channel_ids: set[ChannelID],
279
    page: int,
280
    items_per_page: int,
281
    *,
282
    published_only: bool = False,
283
) -> Pagination:
284
    """Return the news items to show on the specified page."""
285 1
    items_query = _get_items_query(channel_ids)
286 1
    count_query = _get_count_query(channel_ids)
287
288 1
    if published_only:
289 1
        items_query = items_query \
290
            .filter(DbItem.published_at <= datetime.utcnow())
291 1
        count_query = count_query \
292
            .filter(DbItem.published_at <= datetime.utcnow())
293
294 1
    item_mapper = partial(_db_entity_to_item, render_body=True)
295
296 1
    return paginate(
297
        items_query,
298
        count_query,
299
        page,
300
        items_per_page,
301
        scalar_result=True,
302
        unique_result=True,
303
        item_mapper=item_mapper,
304
    )
305
306
307 1
def get_items_paginated(
308
    channel_ids: set[ChannelID], page: int, items_per_page: int
309
) -> Pagination:
310
    """Return the news items to show on the specified page."""
311 1
    items_query = _get_items_query(channel_ids)
312 1
    count_query = _get_count_query(channel_ids)
313
314 1
    return paginate(
315
        items_query,
316
        count_query,
317
        page,
318
        items_per_page,
319
        scalar_result=True,
320
        unique_result=True,
321
    )
322
323
324 1
def get_recent_headlines(
325
    channel_ids: Union[frozenset[ChannelID], set[ChannelID]], limit: int
326
) -> list[Headline]:
327
    """Return the most recent headlines."""
328
    db_items = db.session \
329
        .query(DbItem) \
330
        .filter(DbItem.channel_id.in_(channel_ids)) \
331
        .options(
332
            db.joinedload(DbItem.current_version_association)
333
                .joinedload(DbCurrentVersionAssociation.version)
334
        ) \
335
        .filter(DbItem.published_at <= datetime.utcnow()) \
336
        .order_by(DbItem.published_at.desc()) \
337
        .limit(limit) \
338
        .all()
339
340
    return [
341
        Headline(
342
            slug=db_item.slug,
343
            published_at=db_item.published_at,
344
            title=db_item.current_version.title,
345
        )
346
        for db_item in db_items
347
    ]
348
349
350 1
def _get_items_query(channel_ids: set[ChannelID]) -> Select:
351 1
    return select(DbItem) \
352
        .filter(DbItem.channel_id.in_(channel_ids)) \
353
        .options(
354
            db.joinedload(DbItem.channel),
355
            db.joinedload(DbItem.current_version_association)
356
                .joinedload(DbCurrentVersionAssociation.version),
357
            db.joinedload(DbItem.images)
358
        ) \
359
        .order_by(DbItem.published_at.desc())
360
361
362 1
def _get_count_query(channel_ids: set[ChannelID]) -> Select:
363 1
    return select(db.func.count(DbItem.id)) \
364
        .filter(DbItem.channel_id.in_(channel_ids))
365
366
367 1
def get_item_versions(item_id: ItemID) -> list[DbItemVersion]:
368
    """Return all item versions, sorted from most recent to oldest."""
369 1
    return db.session \
370
        .query(DbItemVersion) \
371
        .filter_by(item_id=item_id) \
372
        .order_by(DbItemVersion.created_at.desc()) \
373
        .all()
374
375
376 1
def get_current_item_version(item_id: ItemID) -> DbItemVersion:
377
    """Return the item's current version."""
378 1
    db_item = _get_db_item(item_id)
379
380 1
    return db_item.current_version
381
382
383 1
def find_item_version(version_id: ItemVersionID) -> DbItemVersion:
384
    """Return the item version with that ID, or `None` if not found."""
385 1
    return db.session.get(DbItemVersion, version_id)
386
387
388 1
def has_channel_items(channel_id: ChannelID) -> bool:
389
    """Return `True` if the channel contains items."""
390
    return db.session \
391
        .query(
392
            db.session
393
                .query(DbItem)
394
                .join(DbChannel)
395
                .filter(DbChannel.id == channel_id)
396
                .exists()
397
        ) \
398
        .scalar()
399
400
401 1
def get_item_count_by_channel_id() -> dict[ChannelID, int]:
402
    """Return news item count (including 0) per channel, indexed by
403
    channel ID.
404
    """
405 1
    channel_ids_and_item_counts = db.session \
406
        .query(
407
            DbChannel.id,
408
            db.func.count(DbItem.id)
409
        ) \
410
        .outerjoin(DbItem) \
411
        .group_by(DbChannel.id) \
412
        .all()
413
414 1
    return dict(channel_ids_and_item_counts)
415
416
417 1
def _db_entity_to_item(
418
    db_item: DbItem, *, render_body: Optional[bool] = False
419
) -> Item:
420 1
    channel = _db_entity_to_channel(db_item.channel)
421
422 1
    image_url_path = _assemble_image_url_path(db_item)
423 1
    images = [
424
        image_service._db_entity_to_image(image, channel.id)
425
        for image in db_item.images
426
    ]
427
428 1
    item = Item(
429
        id=db_item.id,
430
        channel=channel,
431
        slug=db_item.slug,
432
        published_at=db_item.published_at,
433
        published=db_item.published_at is not None,
434
        title=db_item.current_version.title,
435
        body=db_item.current_version.body,
436
        body_format=db_item.current_version.body_format,
437
        image_url_path=image_url_path,
438
        images=images,
439
        featured_image_id=db_item.featured_image_id,
440
    )
441
442 1
    if render_body:
443 1
        rendered_body = _render_body(item)
444 1
        item = dataclasses.replace(item, body=rendered_body)
445
446 1
    return item
447
448
449 1
def _assemble_image_url_path(db_item: DbItem) -> Optional[str]:
450 1
    url_path = db_item.current_version.image_url_path
451
452 1
    if not url_path:
453 1
        return None
454
455 1
    return f'/data/global/news_channels/{db_item.channel_id}/{url_path}'
456
457
458 1
def _render_body(item: Item) -> Optional[str]:
459
    """Render body text to HTML."""
460 1
    try:
461 1
        return html_service.render_body(item, item.body, item.body_format)
462
    except Exception as e:
463
        return None  # Not the best error indicator.
464