Passed
Push — master ( 875b3f...ba269f )
by Jochen
02:30
created

byceps.services.news.service.render_body()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 1
dl 0
loc 4
rs 10
c 0
b 0
f 0
1
"""
2
byceps.services.news.service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2020 Jochen Kupperschmidt
6
:License: Modified BSD, see LICENSE for details.
7
"""
8
9
from datetime import datetime
10
from typing import Dict, Optional, Sequence
11
12
from ...database import db, paginate, Pagination, Query
13
from ...events.news import NewsItemPublished
14
from ...typing import BrandID, UserID
15
from ...util.templating import load_template
16
17
from ..brand.models.brand import Brand as DbBrand
18
19
from .channel_service import _db_entity_to_channel
20
from .models.channel import Channel as DbChannel
21
from .models.item import (
22
    CurrentVersionAssociation as DbCurrentVersionAssociation,
23
    Item as DbItem,
24
    ItemVersion as DbItemVersion,
25
)
26
from . import image_service
27
from .transfer.models import ChannelID, Item, ItemID, ItemVersionID
28
29
30
def create_item(
31
    brand_id: BrandID,
32
    slug: str,
33
    creator_id: UserID,
34
    title: str,
35
    body: str,
36
    *,
37
    image_url_path: Optional[str] = None,
38
) -> Item:
39
    """Create a news item, a version, and set the version as the item's
40
    current one.
41
    """
42
    item = DbItem(brand_id, slug)
43
    db.session.add(item)
44
45
    version = _create_version(
46
        item, creator_id, title, body, image_url_path=image_url_path
47
    )
48
    db.session.add(version)
49
50
    current_version_association = DbCurrentVersionAssociation(item, version)
51
    db.session.add(current_version_association)
52
53
    db.session.commit()
54
55
    return _db_entity_to_item(item)
56
57
58
def update_item(
59
    item_id: ItemID,
60
    slug: str,
61
    creator_id: UserID,
62
    title: str,
63
    body: str,
64
    *,
65
    image_url_path: Optional[str] = None,
66
) -> None:
67
    """Update a news item by creating a new version of it and setting
68
    the new version as the current one.
69
    """
70
    item = _get_db_item(item_id)
71
72
    item.slug = slug
73
    db.session.add(item)
74
75
    version = _create_version(
76
        item, creator_id, title, body, image_url_path=image_url_path
77
    )
78
    db.session.add(version)
79
80
    item.current_version = version
81
82
    db.session.commit()
83
84
85
def _create_version(
86
    item: DbItem,
87
    creator_id: UserID,
88
    title: str,
89
    body: str,
90
    *,
91
    image_url_path: Optional[str] = None,
92
) -> DbItemVersion:
93
    version = DbItemVersion(item, creator_id, title, body)
94
95
    if image_url_path:
96
        version.image_url_path = image_url_path
97
98
    return version
99
100
101
def publish_item(item_id: ItemID) -> NewsItemPublished:
102
    """Publish a news item."""
103
    item = _get_db_item(item_id)
104
105
    item.published_at = datetime.utcnow()
106
    db.session.commit()
107
108
    return NewsItemPublished(occurred_at=item.published_at, item_id=item.id)
109
110
111
def find_item(item_id: ItemID) -> Optional[Item]:
112
    """Return the item with that id, or `None` if not found."""
113
    item = _find_db_item(item_id)
114
115
    if item is None:
116
        return None
117
118
    return _db_entity_to_item(item)
119
120
121
def _find_db_item(item_id: ItemID) -> Optional[DbItem]:
122
    """Return the item with that id, or `None` if not found."""
123
    return DbItem.query \
124
        .with_channel() \
125
        .get(item_id)
126
127
128
def _get_db_item(item_id: ItemID) -> DbItem:
129
    """Return the item with that id, or raise an exception."""
130
    item = _find_db_item(item_id)
131
132
    if item is None:
133
        raise ValueError(f'Unknown news item ID "{item_id}".')
134
135
    return item
136
137
138
def find_aggregated_item_by_slug(
139
    channel_id: ChannelID, slug: str, *, published_only: bool = False
140
) -> Optional[Item]:
141
    """Return the news item identified by that slug, or `None` if not found."""
142
    query = DbItem.query \
143
        .for_channel(channel_id) \
144
        .with_channel() \
145
        .with_current_version() \
146
        .filter_by(slug=slug)
147
148
    if published_only:
149
        query = query.published()
150
151
    item = query.one_or_none()
152
153
    if item is None:
154
        return None
155
156
    return _db_entity_to_item(item)
157
158
159
def get_aggregated_items_paginated(
160
    channel_id: ChannelID,
161
    page: int,
162
    items_per_page: int,
163
    *,
164
    published_only: bool = False,
165
) -> Pagination:
166
    """Return the news items to show on the specified page."""
167
    query = _get_items_query(channel_id)
168
169
    if published_only:
170
        query = query.published()
171
172
    return paginate(query, page, items_per_page, item_mapper=_db_entity_to_item)
173
174
175
def get_items_paginated(
176
    channel_id: ChannelID, page: int, items_per_page: int
177
) -> Pagination:
178
    """Return the news items to show on the specified page."""
179
    return _get_items_query(channel_id) \
180
        .paginate(page, items_per_page)
181
182
183
def _get_items_query(channel_id: ChannelID) -> Query:
184
    return DbItem.query \
185
        .for_channel(channel_id) \
186
        .with_channel() \
187
        .with_current_version() \
188
        .order_by(DbItem.published_at.desc())
189
190
191
def get_item_versions(item_id: ItemID) -> Sequence[DbItemVersion]:
192
    """Return all item versions, sorted from most recent to oldest."""
193
    return DbItemVersion.query \
194
        .for_item(item_id) \
195
        .order_by(DbItemVersion.created_at.desc()) \
196
        .all()
197
198
199
def get_current_item_version(item_id: ItemID) -> DbItemVersion:
200
    """Return the item's current version."""
201
    item = _get_db_item(item_id)
202
203
    return item.current_version
204
205
206
def find_item_version(version_id: ItemVersionID) -> DbItemVersion:
207
    """Return the item version with that ID, or `None` if not found."""
208
    return DbItemVersion.query.get(version_id)
209
210
211
def count_items_for_brand(brand_id: BrandID) -> int:
212
    """Return the number of news items for that brand."""
213
    return DbItem.query \
214
        .join(DbChannel) \
215
        .filter(DbChannel.brand_id == brand_id) \
216
        .count()
217
218
219
def get_item_count_by_brand_id() -> Dict[BrandID, int]:
220
    """Return news item count (including 0) per brand, indexed by brand ID."""
221
    brand_ids_and_item_counts = db.session \
222
        .query(
223
            DbBrand.id,
224
            db.func.count(DbItem.id)
225
        ) \
226
        .outerjoin(DbChannel, DbChannel.brand_id == DbBrand.id) \
227
        .outerjoin(DbItem) \
228
        .group_by(DbBrand.id) \
229
        .all()
230
231
    return dict(brand_ids_and_item_counts)
232
233
234
def get_item_count_by_channel_id() -> Dict[ChannelID, int]:
235
    """Return news item count (including 0) per channel, indexed by
236
    channel ID.
237
    """
238
    channel_ids_and_item_counts = db.session \
239
        .query(
240
            DbChannel.id,
241
            db.func.count(DbItem.id)
242
        ) \
243
        .outerjoin(DbItem) \
244
        .group_by(DbChannel.id) \
245
        .all()
246
247
    return dict(channel_ids_and_item_counts)
248
249
250
def _db_entity_to_item(item: DbItem) -> Item:
251
    channel = _db_entity_to_channel(item.channel)
252
    external_url = item.channel.url_prefix + item.slug
253
    image_url_path = _assemble_image_url_path(item)
254
    images = [image_service._db_entity_to_image(image) for image in item.images]
255
256
    return Item(
257
        id=item.id,
258
        channel=channel,
259
        slug=item.slug,
260
        published_at=item.published_at,
261
        published=item.published_at is not None,
262
        title=item.current_version.title,
263
        body=item.current_version.body,
264
        external_url=external_url,
265
        image_url_path=image_url_path,
266
        images=images,
267
    )
268
269
270
def _assemble_image_url_path(item: DbItem) -> Optional[str]:
271
    url_path = item.current_version.image_url_path
272
273
    if not url_path:
274
        return None
275
276
    return f'/brand/news/{url_path}'
277
278
279
def render_body(raw_body: str) -> str:
280
    """Render item's raw body to HTML."""
281
    template = load_template(raw_body)
282
    return template.render()
283