byceps.services.news.service   A
last analyzed

Complexity

Total Complexity 36

Size/Duplication

Total Lines 397
Duplicated Lines 0 %

Test Coverage

Coverage 85.71%

Importance

Changes 0
Metric Value
eloc 251
dl 0
loc 397
ccs 108
cts 126
cp 0.8571
rs 9.52
c 0
b 0
f 0
wmc 36

22 Functions

Rating   Name   Duplication   Size   Complexity  
A _db_entity_to_item() 0 30 2
A find_item() 0 8 2
A get_items_paginated() 0 6 1
A get_item_versions() 0 7 1
A _render_body() 0 6 2
A get_recent_headlines() 0 23 1
A get_current_item_version() 0 5 1
A _create_version() 0 14 2
A delete_item() 0 15 1
A find_aggregated_item_by_slug() 0 26 3
A get_aggregated_items_paginated() 0 16 2
B publish_item() 0 36 6
A set_featured_image() 0 6 1
A _assemble_image_url_path() 0 7 2
A find_item_version() 0 3 1
A _get_items_query() 0 11 1
A get_item_count_by_channel_id() 0 14 1
A _find_db_item() 0 8 1
A update_item() 0 26 1
A _get_db_item() 0 8 2
A create_item() 0 26 1
A has_channel_items() 0 11 1
1
"""
2
byceps.services.news.service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2021 Jochen Kupperschmidt
6
:License: Revised BSD (see `LICENSE` file for details)
7
"""
8
9 1
from __future__ import annotations
10 1
import dataclasses
11 1
from datetime import datetime
12 1
from functools import partial
13 1
from typing import Optional, Sequence
14
15 1
from ...database import db, paginate, Pagination, Query
16 1
from ...events.news import NewsItemPublished
17 1
from ...typing import UserID
18
19 1
from ..user import service as user_service
20 1
from ..user.transfer.models import User
21
22 1
from .channel_service import _db_entity_to_channel
23 1
from . import html_service
24 1
from .dbmodels.channel import Channel as DbChannel
25 1
from .dbmodels.item import (
26
    CurrentVersionAssociation as DbCurrentVersionAssociation,
27
    Item as DbItem,
28
    ItemVersion as DbItemVersion,
29
)
30 1
from . import image_service
31 1
from .transfer.models import (
32
    ChannelID,
33
    Headline,
34
    ImageID,
35
    Item,
36
    ItemID,
37
    ItemVersionID,
38
)
39
40
41 1
def create_item(
42
    channel_id: ChannelID,
43
    slug: str,
44
    creator_id: UserID,
45
    title: str,
46
    body: str,
47
    *,
48
    image_url_path: Optional[str] = None,
49
) -> Item:
50
    """Create a news item, a version, and set the version as the item's
51
    current one.
52
    """
53 1
    item = DbItem(channel_id, slug)
54 1
    db.session.add(item)
55
56 1
    version = _create_version(
57
        item, creator_id, title, body, image_url_path=image_url_path
58
    )
59 1
    db.session.add(version)
60
61 1
    current_version_association = DbCurrentVersionAssociation(item, version)
62 1
    db.session.add(current_version_association)
63
64 1
    db.session.commit()
65
66 1
    return _db_entity_to_item(item)
67
68
69 1
def update_item(
70
    item_id: ItemID,
71
    slug: str,
72
    creator_id: UserID,
73
    title: str,
74
    body: str,
75
    *,
76
    image_url_path: Optional[str] = None,
77
) -> Item:
78
    """Update a news item by creating a new version of it and setting
79
    the new version as the current one.
80
    """
81
    item = _get_db_item(item_id)
82
83
    item.slug = slug
84
85
    version = _create_version(
86
        item, creator_id, title, body, image_url_path=image_url_path
87
    )
88
    db.session.add(version)
89
90
    item.current_version = version
91
92
    db.session.commit()
93
94
    return _db_entity_to_item(item)
95
96
97 1
def _create_version(
98
    item: DbItem,
99
    creator_id: UserID,
100
    title: str,
101
    body: str,
102
    *,
103
    image_url_path: Optional[str] = None,
104
) -> DbItemVersion:
105 1
    version = DbItemVersion(item, creator_id, title, body)
106
107 1
    if image_url_path:
108 1
        version.image_url_path = image_url_path
109
110 1
    return version
111
112
113 1
def set_featured_image(item_id: ItemID, image_id: ImageID) -> None:
114
    """Set an image as featured image."""
115
    db_item = _get_db_item(item_id)
116
117
    db_item.featured_image_id = image_id
118
    db.session.commit()
119
120
121 1
def publish_item(
122
    item_id: ItemID,
123
    *,
124
    publish_at: Optional[datetime] = None,
125
    initiator_id: Optional[UserID] = None,
126
) -> NewsItemPublished:
127
    """Publish a news item."""
128 1
    db_item = _get_db_item(item_id)
129
130 1
    if db_item.published:
131
        raise ValueError('News item has already been published')
132
133 1
    now = datetime.utcnow()
134 1
    if publish_at is None:
135 1
        publish_at = now
136
137
    initiator: Optional[User]
138 1
    if initiator_id is not None:
139 1
        initiator = user_service.get_user(initiator_id)
140
    else:
141 1
        initiator = None
142
143 1
    db_item.published_at = publish_at
144 1
    db.session.commit()
145
146 1
    item = _db_entity_to_item(db_item)
147
148 1
    return NewsItemPublished(
149
        occurred_at=now,
150
        initiator_id=initiator.id if initiator else None,
151
        initiator_screen_name=initiator.screen_name if initiator else None,
152
        item_id=item.id,
153
        channel_id=item.channel.id,
154
        published_at=item.published_at,
155
        title=item.title,
156
        external_url=item.external_url,
157
    )
158
159
160 1
def delete_item(item_id: ItemID) -> None:
161
    """Delete a news item and its versions."""
162 1
    db.session.query(DbCurrentVersionAssociation) \
163
        .filter_by(item_id=item_id) \
164
        .delete()
165
166 1
    db.session.query(DbItemVersion) \
167
        .filter_by(item_id=item_id) \
168
        .delete()
169
170 1
    db.session.query(DbItem) \
171
        .filter_by(id=item_id) \
172
        .delete()
173
174 1
    db.session.commit()
175
176
177 1
def find_item(item_id: ItemID) -> Optional[Item]:
178
    """Return the item with that id, or `None` if not found."""
179 1
    item = _find_db_item(item_id)
180
181 1
    if item is None:
182
        return None
183
184 1
    return _db_entity_to_item(item)
185
186
187 1
def _find_db_item(item_id: ItemID) -> Optional[DbItem]:
188
    """Return the item with that id, or `None` if not found."""
189 1
    return db.session.query(DbItem) \
190
        .options(
191
            db.joinedload(DbItem.channel),
192
            db.joinedload(DbItem.images)
193
        ) \
194
        .get(item_id)
195
196
197 1
def _get_db_item(item_id: ItemID) -> DbItem:
198
    """Return the item with that id, or raise an exception."""
199 1
    item = _find_db_item(item_id)
200
201 1
    if item is None:
202
        raise ValueError(f'Unknown news item ID "{item_id}".')
203
204 1
    return item
205
206
207 1
def find_aggregated_item_by_slug(
208
    channel_ids: set[ChannelID], slug: str, *, published_only: bool = False
209
) -> Optional[Item]:
210
    """Return the news item identified by that slug in one of the given
211
    channels, or `None` if not found.
212
    """
213 1
    query = db.session \
214
        .query(DbItem) \
215
        .filter(DbItem.channel_id.in_(channel_ids)) \
216
        .options(
217
            db.joinedload(DbItem.channel),
218
            db.joinedload(DbItem.current_version_association)
219
                .joinedload(DbCurrentVersionAssociation.version),
220
            db.joinedload(DbItem.images)
221
        ) \
222
        .filter_by(slug=slug)
223
224 1
    if published_only:
225 1
        query = query.filter(DbItem.published_at <= datetime.utcnow())
226
227 1
    item = query.one_or_none()
228
229 1
    if item is None:
230 1
        return None
231
232 1
    return _db_entity_to_item(item, render_body=True)
233
234
235 1
def get_aggregated_items_paginated(
236
    channel_ids: set[ChannelID],
237
    page: int,
238
    items_per_page: int,
239
    *,
240
    published_only: bool = False,
241
) -> Pagination:
242
    """Return the news items to show on the specified page."""
243 1
    query = _get_items_query(channel_ids)
244
245 1
    if published_only:
246 1
        query = query.filter(DbItem.published_at <= datetime.utcnow())
247
248 1
    item_mapper = partial(_db_entity_to_item, render_body=True)
249
250 1
    return paginate(query, page, items_per_page, item_mapper=item_mapper)
251
252
253 1
def get_items_paginated(
254
    channel_ids: set[ChannelID], page: int, items_per_page: int
255
) -> Pagination:
256
    """Return the news items to show on the specified page."""
257 1
    return _get_items_query(channel_ids) \
258
        .paginate(page, items_per_page)
259
260
261 1
def get_recent_headlines(
262
    channel_ids: set[ChannelID], limit: int
263
) -> list[Headline]:
264
    """Return the most recent headlines."""
265
    items = db.session \
266
        .query(DbItem) \
267
        .filter(DbItem.channel_id.in_(channel_ids)) \
268
        .options(
269
            db.joinedload(DbItem.current_version_association)
270
                .joinedload(DbCurrentVersionAssociation.version)
271
        ) \
272
        .filter(DbItem.published_at <= datetime.utcnow()) \
273
        .order_by(DbItem.published_at.desc()) \
274
        .limit(limit) \
275
        .all()
276
277
    return [
278
        Headline(
279
            slug=item.slug,
280
            published_at=item.published_at,
281
            title=item.current_version.title,
282
        )
283
        for item in items
284
    ]
285
286
287 1
def _get_items_query(channel_ids: set[ChannelID]) -> Query:
288 1
    return db.session \
289
        .query(DbItem) \
290
        .filter(DbItem.channel_id.in_(channel_ids)) \
291
        .options(
292
            db.joinedload(DbItem.channel),
293
            db.joinedload(DbItem.current_version_association)
294
                .joinedload(DbCurrentVersionAssociation.version),
295
            db.joinedload(DbItem.images)
296
        ) \
297
        .order_by(DbItem.published_at.desc())
298
299
300 1
def get_item_versions(item_id: ItemID) -> Sequence[DbItemVersion]:
301
    """Return all item versions, sorted from most recent to oldest."""
302 1
    return db.session \
303
        .query(DbItemVersion) \
304
        .filter_by(item_id=item_id) \
305
        .order_by(DbItemVersion.created_at.desc()) \
306
        .all()
307
308
309 1
def get_current_item_version(item_id: ItemID) -> DbItemVersion:
310
    """Return the item's current version."""
311 1
    item = _get_db_item(item_id)
312
313 1
    return item.current_version
314
315
316 1
def find_item_version(version_id: ItemVersionID) -> DbItemVersion:
317
    """Return the item version with that ID, or `None` if not found."""
318 1
    return db.session.query(DbItemVersion).get(version_id)
319
320
321 1
def has_channel_items(channel_id: ChannelID) -> bool:
322
    """Return `True` if the channel contains items."""
323
    return db.session \
324
        .query(
325
            db.session
326
                .query(DbItem)
327
                .join(DbChannel)
328
                .filter(DbChannel.id == channel_id)
329
                .exists()
330
        ) \
331
        .scalar()
332
333
334 1
def get_item_count_by_channel_id() -> dict[ChannelID, int]:
335
    """Return news item count (including 0) per channel, indexed by
336
    channel ID.
337
    """
338 1
    channel_ids_and_item_counts = db.session \
339
        .query(
340
            DbChannel.id,
341
            db.func.count(DbItem.id)
342
        ) \
343
        .outerjoin(DbItem) \
344
        .group_by(DbChannel.id) \
345
        .all()
346
347 1
    return dict(channel_ids_and_item_counts)
348
349
350 1
def _db_entity_to_item(
351
    db_item: DbItem, *, render_body: Optional[bool] = False
352
) -> Item:
353 1
    channel = _db_entity_to_channel(db_item.channel)
354 1
    external_url = channel.url_prefix + db_item.slug
355 1
    image_url_path = _assemble_image_url_path(db_item)
356 1
    images = [
357
        image_service._db_entity_to_image(image, channel.id)
358
        for image in db_item.images
359
    ]
360
361 1
    item = Item(
362
        id=db_item.id,
363
        channel=channel,
364
        slug=db_item.slug,
365
        published_at=db_item.published_at,
366
        published=db_item.published_at is not None,
367
        title=db_item.current_version.title,
368
        body=db_item.current_version.body,
369
        external_url=external_url,
370
        image_url_path=image_url_path,
371
        images=images,
372
        featured_image_id=db_item.featured_image_id,
373
    )
374
375 1
    if render_body:
376 1
        rendered_body = _render_body(item)
377 1
        item = dataclasses.replace(item, body=rendered_body)
378
379 1
    return item
380
381
382 1
def _assemble_image_url_path(item: DbItem) -> Optional[str]:
383 1
    url_path = item.current_version.image_url_path
384
385 1
    if not url_path:
386 1
        return None
387
388 1
    return f'/data/global/news_channels/{item.channel_id}/{url_path}'
389
390
391 1
def _render_body(item: Item) -> Optional[str]:
392
    """Render body text to HTML."""
393 1
    try:
394 1
        return html_service.render_body(item, item.body)
395
    except Exception as e:
396
        return None  # Not the best error indicator.
397