Passed
Push — main ( fafd4d...2bc15d )
by Jochen
06:53
created

byceps.services.news.news_item_service   A

Complexity

Total Complexity 42

Size/Duplication

Total Lines 478
Duplicated Lines 0 %

Test Coverage

Coverage 82.99%

Importance

Changes 0
Metric Value
eloc 301
dl 0
loc 478
ccs 122
cts 147
cp 0.8299
rs 9.0399
c 0
b 0
f 0
wmc 42

25 Functions

Rating   Name   Duplication   Size   Complexity  
A update_item() 0 32 1
A create_item() 0 34 1
A _find_db_item() 0 13 1
A find_aggregated_item_by_slug() 0 28 3
A delete_item() 0 12 1
A find_item() 0 8 2
B publish_item() 0 42 7
A set_featured_image() 0 6 1
A unpublish_item() 0 13 2
A _create_version() 0 17 2
A _get_db_item() 0 8 2
A _assemble_image_url_path() 0 7 2
A get_aggregated_items_paginated() 0 17 2
A _get_items_stmt() 0 12 1
A get_recent_headlines() 0 22 1
A has_channel_items() 0 8 1
A get_current_item_version() 0 5 1
A _db_entity_to_item() 0 30 2
A get_item_versions() 0 7 1
A get_headlines_paginated() 0 25 2
A get_items_paginated() 0 7 1
A get_item_count_by_channel_id() 0 11 1
A find_item_version() 0 3 1
A _render_body() 0 6 2
A _db_entity_to_headline() 0 6 1

How to fix   Complexity   

Complexity

Complex classes like byceps.services.news.news_item_service often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
byceps.services.news.news_item_service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2014-2023 Jochen Kupperschmidt
6
:License: Revised BSD (see `LICENSE` file for details)
7
"""
8
9 1
from __future__ import annotations
10 1
import dataclasses
11 1
from datetime import datetime
12 1
from functools import partial
13 1
from typing import Optional
14
15 1
from sqlalchemy import delete, select
16 1
from sqlalchemy.sql import Select
17
18 1
from ...database import db, paginate, Pagination
19 1
from ...events.news import NewsItemPublished
20 1
from ...typing import UserID
21
22 1
from ..site.models import SiteID
23 1
from ..site import site_service
24 1
from ..user.models.user import User
25 1
from ..user import user_service
26
27 1
from .dbmodels.channel import DbNewsChannel
28 1
from .dbmodels.item import (
29
    DbCurrentNewsItemVersionAssociation,
30
    DbNewsItem,
31
    DbNewsItemVersion,
32
)
33 1
from .models import (
34
    BodyFormat,
35
    NewsChannelID,
36
    NewsHeadline,
37
    NewsImageID,
38
    NewsItem,
39
    NewsItemID,
40
    NewsItemVersionID,
41
)
42 1
from . import news_channel_service, news_html_service, news_image_service
43
44
45 1
def create_item(
46
    channel_id: NewsChannelID,
47
    slug: str,
48
    creator_id: UserID,
49
    title: str,
50
    body: str,
51
    body_format: BodyFormat,
52
    *,
53
    image_url_path: Optional[str] = None,
54
) -> NewsItem:
55
    """Create a news item, a version, and set the version as the item's
56
    current one.
57
    """
58 1
    db_item = DbNewsItem(channel_id, slug)
59 1
    db.session.add(db_item)
60
61 1
    db_version = _create_version(
62
        db_item,
63
        creator_id,
64
        title,
65
        body,
66
        body_format,
67
        image_url_path=image_url_path,
68
    )
69 1
    db.session.add(db_version)
70
71 1
    db_current_version_association = DbCurrentNewsItemVersionAssociation(
72
        db_item, db_version
73
    )
74 1
    db.session.add(db_current_version_association)
75
76 1
    db.session.commit()
77
78 1
    return _db_entity_to_item(db_item)
79
80
81 1
def update_item(
82
    item_id: NewsItemID,
83
    slug: str,
84
    creator_id: UserID,
85
    title: str,
86
    body: str,
87
    body_format: BodyFormat,
88
    *,
89
    image_url_path: Optional[str] = None,
90
) -> NewsItem:
91
    """Update a news item by creating a new version of it and setting
92
    the new version as the current one.
93
    """
94
    db_item = _get_db_item(item_id)
95
96
    db_item.slug = slug
97
98
    db_version = _create_version(
99
        db_item,
100
        creator_id,
101
        title,
102
        body,
103
        body_format,
104
        image_url_path=image_url_path,
105
    )
106
    db.session.add(db_version)
107
108
    db_item.current_version = db_version
109
110
    db.session.commit()
111
112
    return _db_entity_to_item(db_item)
113
114
115 1
def _create_version(
116
    db_item: DbNewsItem,
117
    creator_id: UserID,
118
    title: str,
119
    body: str,
120
    body_format: BodyFormat,
121
    *,
122
    image_url_path: Optional[str] = None,
123
) -> DbNewsItemVersion:
124 1
    db_version = DbNewsItemVersion(
125
        db_item, creator_id, title, body, body_format
126
    )
127
128 1
    if image_url_path:
129 1
        db_version.image_url_path = image_url_path
130
131 1
    return db_version
132
133
134 1
def set_featured_image(item_id: NewsItemID, image_id: NewsImageID) -> None:
135
    """Set an image as featured image."""
136
    db_item = _get_db_item(item_id)
137
138
    db_item.featured_image_id = image_id
139
    db.session.commit()
140
141
142 1
def publish_item(
143
    item_id: NewsItemID,
144
    *,
145
    publish_at: Optional[datetime] = None,
146
    initiator_id: Optional[UserID] = None,
147
) -> NewsItemPublished:
148
    """Publish a news item."""
149 1
    db_item = _get_db_item(item_id)
150
151 1
    if db_item.published:
152
        raise ValueError('News item has already been published')
153
154 1
    now = datetime.utcnow()
155 1
    if publish_at is None:
156 1
        publish_at = now
157
158
    initiator: Optional[User]
159 1
    if initiator_id is not None:
160 1
        initiator = user_service.get_user(initiator_id)
161
    else:
162 1
        initiator = None
163
164 1
    db_item.published_at = publish_at
165 1
    db.session.commit()
166
167 1
    item = _db_entity_to_item(db_item)
168
169 1
    if item.channel.announcement_site_id is not None:
170 1
        site = site_service.get_site(SiteID(item.channel.announcement_site_id))
171 1
        external_url = f'https://{site.server_name}/news/{item.slug}'
172
    else:
173 1
        external_url = None
174
175 1
    return NewsItemPublished(
176
        occurred_at=now,
177
        initiator_id=initiator.id if initiator else None,
178
        initiator_screen_name=initiator.screen_name if initiator else None,
179
        item_id=item.id,
180
        channel_id=item.channel.id,
181
        published_at=publish_at,
182
        title=item.title,
183
        external_url=external_url,
184
    )
185
186
187 1
def unpublish_item(
188
    item_id: NewsItemID,
189
    *,
190
    initiator_id: Optional[UserID] = None,
191
) -> None:
192
    """Unublish a news item."""
193 1
    db_item = _get_db_item(item_id)
194
195 1
    if not db_item.published:
196
        raise ValueError('News item is not published')
197
198 1
    db_item.published_at = None
199 1
    db.session.commit()
200
201
202 1
def delete_item(item_id: NewsItemID) -> None:
203
    """Delete a news item and its versions."""
204 1
    db.session.execute(
205
        delete(DbCurrentNewsItemVersionAssociation).where(
206
            DbCurrentNewsItemVersionAssociation.item_id == item_id
207
        )
208
    )
209 1
    db.session.execute(
210
        delete(DbNewsItemVersion).where(DbNewsItemVersion.item_id == item_id)
211
    )
212 1
    db.session.execute(delete(DbNewsItem).where(DbNewsItem.id == item_id))
213 1
    db.session.commit()
214
215
216 1
def find_item(item_id: NewsItemID) -> Optional[NewsItem]:
217
    """Return the item with that id, or `None` if not found."""
218 1
    db_item = _find_db_item(item_id)
219
220 1
    if db_item is None:
221
        return None
222
223 1
    return _db_entity_to_item(db_item)
224
225
226 1
def _find_db_item(item_id: NewsItemID) -> Optional[DbNewsItem]:
227
    """Return the item with that id, or `None` if not found."""
228 1
    return (
229
        db.session.scalars(
230
            select(DbNewsItem)
231
            .filter(DbNewsItem.id == item_id)
232
            .options(
233
                db.joinedload(DbNewsItem.channel),
234
                db.joinedload(DbNewsItem.images),
235
            )
236
        )
237
        .unique()
238
        .one_or_none()
239
    )
240
241
242 1
def _get_db_item(item_id: NewsItemID) -> DbNewsItem:
243
    """Return the item with that id, or raise an exception."""
244 1
    db_item = _find_db_item(item_id)
245
246 1
    if db_item is None:
247
        raise ValueError(f'Unknown news item ID "{item_id}".')
248
249 1
    return db_item
250
251
252 1
def find_aggregated_item_by_slug(
253
    channel_ids: set[NewsChannelID], slug: str, *, published_only: bool = False
254
) -> Optional[NewsItem]:
255
    """Return the news item identified by that slug in one of the given
256
    channels, or `None` if not found.
257
    """
258 1
    stmt = (
259
        select(DbNewsItem)
260
        .filter(DbNewsItem.channel_id.in_(channel_ids))
261
        .filter_by(slug=slug)
262
        .options(
263
            db.joinedload(DbNewsItem.channel),
264
            db.joinedload(DbNewsItem.current_version_association).joinedload(
265
                DbCurrentNewsItemVersionAssociation.version
266
            ),
267
            db.joinedload(DbNewsItem.images),
268
        )
269
    )
270
271 1
    if published_only:
272 1
        stmt = stmt.filter(DbNewsItem.published_at <= datetime.utcnow())
273
274 1
    db_item = db.session.scalars(stmt).unique().one_or_none()
275
276 1
    if db_item is None:
277 1
        return None
278
279 1
    return _db_entity_to_item(db_item, render_body=True)
280
281
282 1
def get_aggregated_items_paginated(
283
    channel_ids: set[NewsChannelID],
284
    page: int,
285
    items_per_page: int,
286
    *,
287
    published_only: bool = False,
288
) -> Pagination:
289
    """Return the news items to show on the specified page."""
290 1
    stmt = _get_items_stmt(channel_ids)
291
292 1
    if published_only:
293 1
        now = datetime.utcnow()
294 1
        stmt = stmt.filter(DbNewsItem.published_at <= now)
295
296 1
    item_mapper = partial(_db_entity_to_item, render_body=True)
297
298 1
    return paginate(stmt, page, items_per_page, item_mapper=item_mapper)
299
300
301 1
def get_items_paginated(
302
    channel_ids: set[NewsChannelID], page: int, items_per_page: int
303
) -> Pagination:
304
    """Return the news items to show on the specified page."""
305 1
    stmt = _get_items_stmt(channel_ids)
306
307 1
    return paginate(stmt, page, items_per_page)
308
309
310 1
def get_headlines_paginated(
311
    channel_ids: set[NewsChannelID],
312
    page: int,
313
    items_per_page: int,
314
    *,
315
    published_only: bool = False,
316
) -> Pagination:
317
    """Return the headlines to show on the specified page."""
318
    stmt = (
319
        select(DbNewsItem)
320
        .filter(DbNewsItem.channel_id.in_(channel_ids))
321
        .options(
322
            db.joinedload(DbNewsItem.current_version_association).joinedload(
323
                DbCurrentNewsItemVersionAssociation.version
324
            )
325
        )
326
        .order_by(DbNewsItem.published_at.desc())
327
    )
328
329
    if published_only:
330
        now = datetime.utcnow()
331
        stmt = stmt.filter(DbNewsItem.published_at <= now)
332
333
    return paginate(
334
        stmt, page, items_per_page, item_mapper=_db_entity_to_headline
335
    )
336
337
338 1
def get_recent_headlines(
339
    channel_ids: frozenset[NewsChannelID] | set[NewsChannelID], limit: int
340
) -> list[NewsHeadline]:
341
    """Return the most recent headlines."""
342
    db_items = (
343
        db.session.scalars(
344
            select(DbNewsItem)
345
            .filter(DbNewsItem.channel_id.in_(channel_ids))
346
            .options(
347
                db.joinedload(
348
                    DbNewsItem.current_version_association
349
                ).joinedload(DbCurrentNewsItemVersionAssociation.version)
350
            )
351
            .filter(DbNewsItem.published_at <= datetime.utcnow())
352
            .order_by(DbNewsItem.published_at.desc())
353
            .limit(limit)
354
        )
355
        .unique()
356
        .all()
357
    )
358
359
    return [_db_entity_to_headline(db_item) for db_item in db_items]
360
361
362 1
def _get_items_stmt(channel_ids: set[NewsChannelID]) -> Select:
363 1
    return (
364
        select(DbNewsItem)
365
        .filter(DbNewsItem.channel_id.in_(channel_ids))
366
        .options(
367
            db.joinedload(DbNewsItem.channel),
368
            db.joinedload(DbNewsItem.current_version_association).joinedload(
369
                DbCurrentNewsItemVersionAssociation.version
370
            ),
371
            db.joinedload(DbNewsItem.images),
372
        )
373
        .order_by(DbNewsItem.published_at.desc())
374
    )
375
376
377 1
def get_item_versions(item_id: NewsItemID) -> list[DbNewsItemVersion]:
378
    """Return all item versions, sorted from most recent to oldest."""
379 1
    return db.session.scalars(
380
        select(DbNewsItemVersion)
381
        .filter_by(item_id=item_id)
382
        .order_by(DbNewsItemVersion.created_at.desc())
383
    ).all()
384
385
386 1
def get_current_item_version(item_id: NewsItemID) -> DbNewsItemVersion:
387
    """Return the item's current version."""
388 1
    db_item = _get_db_item(item_id)
389
390 1
    return db_item.current_version
391
392
393 1
def find_item_version(version_id: NewsItemVersionID) -> DbNewsItemVersion:
394
    """Return the item version with that ID, or `None` if not found."""
395 1
    return db.session.get(DbNewsItemVersion, version_id)
396
397
398 1
def has_channel_items(channel_id: NewsChannelID) -> bool:
399
    """Return `True` if the channel contains items."""
400
    return db.session.scalar(
401
        select(
402
            select(DbNewsItem)
403
            .join(DbNewsChannel)
404
            .filter(DbNewsChannel.id == channel_id)
405
            .exists()
406
        )
407
    )
408
409
410 1
def get_item_count_by_channel_id() -> dict[NewsChannelID, int]:
411
    """Return news item count (including 0) per channel, indexed by
412
    channel ID.
413
    """
414 1
    channel_ids_and_item_counts = db.session.execute(
415
        select(DbNewsChannel.id, db.func.count(DbNewsItem.id))
416
        .outerjoin(DbNewsItem)
417
        .group_by(DbNewsChannel.id)
418
    ).all()
419
420 1
    return dict(channel_ids_and_item_counts)
421
422
423 1
def _db_entity_to_item(
424
    db_item: DbNewsItem, *, render_body: Optional[bool] = False
425
) -> NewsItem:
426 1
    channel = news_channel_service._db_entity_to_channel(db_item.channel)
427
428 1
    image_url_path = _assemble_image_url_path(db_item)
429 1
    images = [
430
        news_image_service._db_entity_to_image(image, channel.id)
431
        for image in db_item.images
432
    ]
433
434 1
    item = NewsItem(
435
        id=db_item.id,
436
        channel=channel,
437
        slug=db_item.slug,
438
        published_at=db_item.published_at,
439
        published=db_item.published_at is not None,
440
        title=db_item.current_version.title,
441
        body=db_item.current_version.body,
442
        body_format=db_item.current_version.body_format,
443
        image_url_path=image_url_path,
444
        images=images,
445
        featured_image_id=db_item.featured_image_id,
446
    )
447
448 1
    if render_body:
449 1
        rendered_body = _render_body(item)
450 1
        item = dataclasses.replace(item, body=rendered_body)
451
452 1
    return item
453
454
455 1
def _assemble_image_url_path(db_item: DbNewsItem) -> Optional[str]:
456 1
    url_path = db_item.current_version.image_url_path
457
458 1
    if not url_path:
459 1
        return None
460
461 1
    return f'/data/global/news_channels/{db_item.channel_id}/{url_path}'
462
463
464 1
def _render_body(item: NewsItem) -> Optional[str]:
465
    """Render body text to HTML."""
466 1
    try:
467 1
        return news_html_service.render_body(item, item.body, item.body_format)
468
    except Exception:
469
        return None  # Not the best error indicator.
470
471
472 1
def _db_entity_to_headline(db_item: DbNewsItem) -> NewsHeadline:
473
    return NewsHeadline(
474
        slug=db_item.slug,
475
        published_at=db_item.published_at,
476
        published=db_item.published_at is not None,
477
        title=db_item.current_version.title,
478
    )
479