Passed
Push — master ( d29cbb...0943a5 )
by Jochen
02:31
created

byceps.services.news.service._assemble_image_url_path()   A

Complexity

Conditions 2

Size

Total Lines 7
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 5
nop 1
dl 0
loc 7
rs 10
c 0
b 0
f 0
1
"""
2
byceps.services.news.service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2019 Jochen Kupperschmidt
6
:License: Modified BSD, see LICENSE for details.
7
"""
8
9
from datetime import datetime
10
from typing import Dict, Optional, Sequence
11
12
from ...database import db, paginate, Pagination, Query
13
from ...events.news import NewsItemPublished
14
from ...typing import BrandID, UserID
15
16
from ..brand.models.brand import Brand as DbBrand
17
18
from .channel_service import _db_entity_to_channel
19
from .models.channel import Channel as DbChannel
20
from .models.item import (
21
    CurrentVersionAssociation as DbCurrentVersionAssociation,
22
    Item as DbItem,
23
    ItemVersion as DbItemVersion,
24
)
25
from . import image_service
26
from .transfer.models import ChannelID, Item, ItemID, ItemVersionID
27
28
29
def create_item(
30
    brand_id: BrandID,
31
    slug: str,
32
    creator_id: UserID,
33
    title: str,
34
    body: str,
35
    *,
36
    image_url_path: Optional[str] = None,
37
) -> Item:
38
    """Create a news item, a version, and set the version as the item's
39
    current one.
40
    """
41
    item = DbItem(brand_id, slug)
42
    db.session.add(item)
43
44
    version = _create_version(
45
        item, creator_id, title, body, image_url_path=image_url_path
46
    )
47
    db.session.add(version)
48
49
    current_version_association = DbCurrentVersionAssociation(item, version)
50
    db.session.add(current_version_association)
51
52
    db.session.commit()
53
54
    return _db_entity_to_item(item)
55
56
57
def update_item(
58
    item_id: ItemID,
59
    slug: str,
60
    creator_id: UserID,
61
    title: str,
62
    body: str,
63
    *,
64
    image_url_path: Optional[str] = None,
65
) -> None:
66
    """Update a news item by creating a new version of it and setting
67
    the new version as the current one.
68
    """
69
    item = _get_db_item(item_id)
70
71
    item.slug = slug
72
    db.session.add(item)
73
74
    version = _create_version(
75
        item, creator_id, title, body, image_url_path=image_url_path
76
    )
77
    db.session.add(version)
78
79
    item.current_version = version
80
81
    db.session.commit()
82
83
84
def _create_version(
85
    item: DbItem,
86
    creator_id: UserID,
87
    title: str,
88
    body: str,
89
    *,
90
    image_url_path: Optional[str] = None,
91
) -> DbItemVersion:
92
    version = DbItemVersion(item, creator_id, title, body)
93
94
    if image_url_path:
95
        version.image_url_path = image_url_path
96
97
    return version
98
99
100
def publish_item(item_id: ItemID) -> NewsItemPublished:
101
    """Publish a news item."""
102
    item = _get_db_item(item_id)
103
104
    item.published_at = datetime.utcnow()
105
    db.session.commit()
106
107
    return NewsItemPublished(occurred_at=item.published_at, item_id=item.id)
108
109
110
def find_item(item_id: ItemID) -> Optional[Item]:
111
    """Return the item with that id, or `None` if not found."""
112
    item = _find_db_item(item_id)
113
114
    if item is None:
115
        return None
116
117
    return _db_entity_to_item(item)
118
119
120
def _find_db_item(item_id: ItemID) -> Optional[DbItem]:
121
    """Return the item with that id, or `None` if not found."""
122
    return DbItem.query \
123
        .with_channel() \
124
        .get(item_id)
125
126
127
def _get_db_item(item_id: ItemID) -> DbItem:
128
    """Return the item with that id, or raise an exception."""
129
    item = _find_db_item(item_id)
130
131
    if item is None:
132
        raise ValueError(f'Unknown news item ID "{item_id}".')
133
134
    return item
135
136
137
def find_aggregated_item_by_slug(
138
    channel_id: ChannelID, slug: str, *, published_only: bool = False
139
) -> Optional[Item]:
140
    """Return the news item identified by that slug, or `None` if not found."""
141
    query = DbItem.query \
142
        .for_channel(channel_id) \
143
        .with_channel() \
144
        .with_current_version() \
145
        .filter_by(slug=slug)
146
147
    if published_only:
148
        query = query.published()
149
150
    item = query.one_or_none()
151
152
    if item is None:
153
        return None
154
155
    return _db_entity_to_item(item)
156
157
158
def get_aggregated_items_paginated(
159
    channel_id: ChannelID,
160
    page: int,
161
    items_per_page: int,
162
    *,
163
    published_only: bool = False,
164
) -> Pagination:
165
    """Return the news items to show on the specified page."""
166
    query = _get_items_query(channel_id)
167
168
    if published_only:
169
        query = query.published()
170
171
    return paginate(query, page, items_per_page, item_mapper=_db_entity_to_item)
172
173
174
def get_items_paginated(
175
    channel_id: ChannelID, page: int, items_per_page: int
176
) -> Pagination:
177
    """Return the news items to show on the specified page."""
178
    return _get_items_query(channel_id) \
179
        .paginate(page, items_per_page)
180
181
182
def _get_items_query(channel_id: ChannelID) -> Query:
183
    return DbItem.query \
184
        .for_channel(channel_id) \
185
        .with_channel() \
186
        .with_current_version() \
187
        .order_by(DbItem.published_at.desc())
188
189
190
def get_item_versions(item_id: ItemID) -> Sequence[DbItemVersion]:
191
    """Return all item versions, sorted from most recent to oldest."""
192
    return DbItemVersion.query \
193
        .for_item(item_id) \
194
        .order_by(DbItemVersion.created_at.desc()) \
195
        .all()
196
197
198
def get_current_item_version(item_id: ItemID) -> DbItemVersion:
199
    """Return the item's current version."""
200
    item = _get_db_item(item_id)
201
202
    return item.current_version
203
204
205
def find_item_version(version_id: ItemVersionID) -> DbItemVersion:
206
    """Return the item version with that ID, or `None` if not found."""
207
    return DbItemVersion.query.get(version_id)
208
209
210
def count_items_for_brand(brand_id: BrandID) -> int:
211
    """Return the number of news items for that brand."""
212
    return DbItem.query \
213
        .join(DbChannel) \
214
        .filter(DbChannel.brand_id == brand_id) \
215
        .count()
216
217
218
def get_item_count_by_brand_id() -> Dict[BrandID, int]:
219
    """Return news item count (including 0) per brand, indexed by brand ID."""
220
    brand_ids_and_item_counts = db.session \
221
        .query(
222
            DbBrand.id,
223
            db.func.count(DbItem.id)
224
        ) \
225
        .outerjoin(DbChannel, DbChannel.brand_id == DbBrand.id) \
226
        .outerjoin(DbItem) \
227
        .group_by(DbBrand.id) \
228
        .all()
229
230
    return dict(brand_ids_and_item_counts)
231
232
233
def get_item_count_by_channel_id() -> Dict[ChannelID, int]:
234
    """Return news item count (including 0) per channel, indexed by
235
    channel ID.
236
    """
237
    channel_ids_and_item_counts = db.session \
238
        .query(
239
            DbChannel.id,
240
            db.func.count(DbItem.id)
241
        ) \
242
        .outerjoin(DbItem) \
243
        .group_by(DbChannel.id) \
244
        .all()
245
246
    return dict(channel_ids_and_item_counts)
247
248
249
def _db_entity_to_item(item: DbItem) -> Item:
250
    channel = _db_entity_to_channel(item.channel)
251
    body = item.current_version.render_body()
252
    external_url = item.channel.url_prefix + item.slug
253
    image_url_path = _assemble_image_url_path(item)
254
    images = [image_service._db_entity_to_image(image) for image in item.images]
255
256
    return Item(
257
        id=item.id,
258
        channel=channel,
259
        slug=item.slug,
260
        published_at=item.published_at,
261
        published=item.published_at is not None,
262
        title=item.title,
263
        body=body,
264
        external_url=external_url,
265
        image_url_path=image_url_path,
266
        images=images,
267
    )
268
269
270
def _assemble_image_url_path(item: DbItem) -> Optional[str]:
271
    url_path = item.current_version.image_url_path
272
273
    if not url_path:
274
        return None
275
276
    return f'/brand/news/{url_path}'
277