Passed
Push — main ( 7fb18a...baf70b )
by Jochen
04:45
created

byceps.services.news.service.has_channel_items()   A

Complexity

Conditions 1

Size

Total Lines 11
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.125

Importance

Changes 0
Metric Value
cc 1
eloc 9
nop 1
dl 0
loc 11
ccs 1
cts 2
cp 0.5
crap 1.125
rs 9.95
c 0
b 0
f 0
1
"""
2
byceps.services.news.service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2021 Jochen Kupperschmidt
6
:License: Revised BSD (see `LICENSE` file for details)
7
"""
8
9 1
from __future__ import annotations
10 1
import dataclasses
11 1
from datetime import datetime
12 1
from functools import partial
13 1
from typing import Optional, Sequence
14
15 1
from ...database import db, paginate, Pagination, Query
16 1
from ...events.news import NewsItemPublished
17 1
from ...typing import UserID
18
19 1
from ..user import service as user_service
20
21 1
from .channel_service import _db_entity_to_channel
22 1
from . import html_service
23 1
from .dbmodels.channel import Channel as DbChannel
24 1
from .dbmodels.item import (
25
    CurrentVersionAssociation as DbCurrentVersionAssociation,
26
    Item as DbItem,
27
    ItemVersion as DbItemVersion,
28
)
29 1
from . import image_service
30 1
from .transfer.models import ChannelID, Headline, Item, ItemID, ItemVersionID
31
32
33 1
def create_item(
34
    channel_id: ChannelID,
35
    slug: str,
36
    creator_id: UserID,
37
    title: str,
38
    body: str,
39
    *,
40
    image_url_path: Optional[str] = None,
41
) -> Item:
42
    """Create a news item, a version, and set the version as the item's
43
    current one.
44
    """
45 1
    item = DbItem(channel_id, slug)
46 1
    db.session.add(item)
47
48 1
    version = _create_version(
49
        item, creator_id, title, body, image_url_path=image_url_path
50
    )
51 1
    db.session.add(version)
52
53 1
    current_version_association = DbCurrentVersionAssociation(item, version)
54 1
    db.session.add(current_version_association)
55
56 1
    db.session.commit()
57
58 1
    return _db_entity_to_item(item)
59
60
61 1
def update_item(
62
    item_id: ItemID,
63
    slug: str,
64
    creator_id: UserID,
65
    title: str,
66
    body: str,
67
    *,
68
    image_url_path: Optional[str] = None,
69
) -> Item:
70
    """Update a news item by creating a new version of it and setting
71
    the new version as the current one.
72
    """
73
    item = _get_db_item(item_id)
74
75
    item.slug = slug
76
77
    version = _create_version(
78
        item, creator_id, title, body, image_url_path=image_url_path
79
    )
80
    db.session.add(version)
81
82
    item.current_version = version
83
84
    db.session.commit()
85
86
    return _db_entity_to_item(item)
87
88
89 1
def _create_version(
90
    item: DbItem,
91
    creator_id: UserID,
92
    title: str,
93
    body: str,
94
    *,
95
    image_url_path: Optional[str] = None,
96
) -> DbItemVersion:
97 1
    version = DbItemVersion(item, creator_id, title, body)
98
99 1
    if image_url_path:
100 1
        version.image_url_path = image_url_path
101
102 1
    return version
103
104
105 1
def publish_item(
106
    item_id: ItemID,
107
    *,
108
    publish_at: Optional[datetime] = None,
109
    initiator_id: Optional[UserID] = None,
110
) -> NewsItemPublished:
111
    """Publish a news item."""
112 1
    db_item = _get_db_item(item_id)
113
114 1
    if db_item.published:
115
        raise ValueError('News item has already been published')
116
117 1
    now = datetime.utcnow()
118 1
    if publish_at is None:
119 1
        publish_at = now
120
121 1
    if initiator_id is not None:
122 1
        initiator = user_service.get_user(initiator_id)
123
    else:
124 1
        initiator = None
125
126 1
    db_item.published_at = publish_at
127 1
    db.session.commit()
128
129 1
    item = _db_entity_to_item(db_item)
130
131 1
    return NewsItemPublished(
132
        occurred_at=now,
133
        initiator_id=initiator.id if initiator else None,
134
        initiator_screen_name=initiator.screen_name if initiator else None,
135
        item_id=item.id,
136
        channel_id=item.channel.id,
137
        published_at=item.published_at,
138
        title=item.title,
139
        external_url=item.external_url,
140
    )
141
142
143 1
def delete_item(item_id: ItemID) -> None:
144
    """Delete a news item and its versions."""
145 1
    db.session.query(DbCurrentVersionAssociation) \
146
        .filter_by(item_id=item_id) \
147
        .delete()
148
149 1
    db.session.query(DbItemVersion) \
150
        .filter_by(item_id=item_id) \
151
        .delete()
152
153 1
    db.session.query(DbItem) \
154
        .filter_by(id=item_id) \
155
        .delete()
156
157 1
    db.session.commit()
158
159
160 1
def find_item(item_id: ItemID) -> Optional[Item]:
161
    """Return the item with that id, or `None` if not found."""
162 1
    item = _find_db_item(item_id)
163
164 1
    if item is None:
165
        return None
166
167 1
    return _db_entity_to_item(item)
168
169
170 1
def _find_db_item(item_id: ItemID) -> Optional[DbItem]:
171
    """Return the item with that id, or `None` if not found."""
172 1
    return DbItem.query \
173
        .with_channel() \
174
        .with_images() \
175
        .get(item_id)
176
177
178 1
def _get_db_item(item_id: ItemID) -> DbItem:
179
    """Return the item with that id, or raise an exception."""
180 1
    item = _find_db_item(item_id)
181
182 1
    if item is None:
183
        raise ValueError(f'Unknown news item ID "{item_id}".')
184
185 1
    return item
186
187
188 1
def find_aggregated_item_by_slug(
189
    channel_ids: set[ChannelID], slug: str, *, published_only: bool = False
190
) -> Optional[Item]:
191
    """Return the news item identified by that slug in one of the given
192
    channels, or `None` if not found.
193
    """
194 1
    query = DbItem.query \
195
        .for_channels(channel_ids) \
196
        .with_channel() \
197
        .with_current_version() \
198
        .with_images() \
199
        .filter_by(slug=slug)
200
201 1
    if published_only:
202 1
        query = query.published()
203
204 1
    item = query.one_or_none()
205
206 1
    if item is None:
207 1
        return None
208
209 1
    return _db_entity_to_item(item, render_body=True)
210
211
212 1
def get_aggregated_items_paginated(
213
    channel_ids: set[ChannelID],
214
    page: int,
215
    items_per_page: int,
216
    *,
217
    published_only: bool = False,
218
) -> Pagination:
219
    """Return the news items to show on the specified page."""
220 1
    query = _get_items_query(channel_ids)
221
222 1
    if published_only:
223 1
        query = query.published()
224
225 1
    item_mapper = partial(_db_entity_to_item, render_body=True)
226
227 1
    return paginate(query, page, items_per_page, item_mapper=item_mapper)
228
229
230 1
def get_items_paginated(
231
    channel_ids: set[ChannelID], page: int, items_per_page: int
232
) -> Pagination:
233
    """Return the news items to show on the specified page."""
234 1
    return _get_items_query(channel_ids) \
235
        .paginate(page, items_per_page)
236
237
238 1
def get_recent_headlines(
239
    channel_ids: set[ChannelID], limit: int
240
) -> list[Headline]:
241
    """Return the most recent headlines."""
242
    items = DbItem.query \
243
        .for_channels(channel_ids) \
244
        .with_current_version() \
245
        .published() \
246
        .order_by(DbItem.published_at.desc()) \
247
        .limit(limit) \
248
        .all()
249
250
    return [
251
        Headline(
252
            slug=item.slug,
253
            published_at=item.published_at,
254
            title=item.current_version.title,
255
        )
256
        for item in items
257
    ]
258
259
260 1
def _get_items_query(channel_ids: set[ChannelID]) -> Query:
261 1
    return DbItem.query \
262
        .for_channels(channel_ids) \
263
        .with_channel() \
264
        .with_current_version() \
265
        .with_images() \
266
        .order_by(DbItem.published_at.desc())
267
268
269 1
def get_item_versions(item_id: ItemID) -> Sequence[DbItemVersion]:
270
    """Return all item versions, sorted from most recent to oldest."""
271 1
    return DbItemVersion.query \
272
        .for_item(item_id) \
273
        .order_by(DbItemVersion.created_at.desc()) \
274
        .all()
275
276
277 1
def get_current_item_version(item_id: ItemID) -> DbItemVersion:
278
    """Return the item's current version."""
279 1
    item = _get_db_item(item_id)
280
281 1
    return item.current_version
282
283
284 1
def find_item_version(version_id: ItemVersionID) -> DbItemVersion:
285
    """Return the item version with that ID, or `None` if not found."""
286 1
    return DbItemVersion.query.get(version_id)
287
288
289 1
def has_channel_items(channel_id: ChannelID) -> bool:
290
    """Return `True` if the channel contains items."""
291
    return db.session \
292
        .query(
293
            db.session
294
                .query(DbItem)
295
                .join(DbChannel)
296
                .filter(DbChannel.id == channel_id)
297
                .exists()
298
        ) \
299
        .scalar()
300
301
302 1
def get_item_count_by_channel_id() -> dict[ChannelID, int]:
303
    """Return news item count (including 0) per channel, indexed by
304
    channel ID.
305
    """
306 1
    channel_ids_and_item_counts = db.session \
307
        .query(
308
            DbChannel.id,
309
            db.func.count(DbItem.id)
310
        ) \
311
        .outerjoin(DbItem) \
312
        .group_by(DbChannel.id) \
313
        .all()
314
315 1
    return dict(channel_ids_and_item_counts)
316
317
318 1
def _db_entity_to_item(
319
    db_item: DbItem, *, render_body: Optional[bool] = False
320
) -> Item:
321 1
    channel = _db_entity_to_channel(db_item.channel)
322 1
    external_url = db_item.channel.url_prefix + db_item.slug
323 1
    image_url_path = _assemble_image_url_path(db_item)
324 1
    images = [
325
        image_service._db_entity_to_image(image, db_item.channel_id)
326
        for image in db_item.images
327
    ]
328
329 1
    item = Item(
330
        id=db_item.id,
331
        channel=channel,
332
        slug=db_item.slug,
333
        published_at=db_item.published_at,
334
        published=db_item.published_at is not None,
335
        title=db_item.current_version.title,
336
        body=db_item.current_version.body,
337
        external_url=external_url,
338
        image_url_path=image_url_path,
339
        images=images,
340
    )
341
342 1
    if render_body:
343 1
        rendered_body = _render_body(item)
344 1
        item = dataclasses.replace(item, body=rendered_body)
345
346 1
    return item
347
348
349 1
def _assemble_image_url_path(item: DbItem) -> Optional[str]:
350 1
    url_path = item.current_version.image_url_path
351
352 1
    if not url_path:
353 1
        return None
354
355 1
    return f'/data/global/news_channels/{item.channel_id}/{url_path}'
356
357
358 1
def _render_body(item: Item) -> Optional[str]:
359
    """Render body text to HTML."""
360 1
    try:
361 1
        return html_service.render_body(item.body, item.channel.id, item.images)
362
    except Exception as e:
363
        return None  # Not the best error indicator.
364