Completed
Push — main ( dc9c2e...80557c )
by Jochen
05:20
created

byceps.services.news.service   A

Complexity

Total Complexity 28

Size/Duplication

Total Lines 292
Duplicated Lines 0 %

Test Coverage

Coverage 91.67%

Importance

Changes 0
Metric Value
eloc 178
dl 0
loc 292
ccs 88
cts 96
cp 0.9167
rs 10
c 0
b 0
f 0
wmc 28

18 Functions

Rating   Name   Duplication   Size   Complexity  
A _db_entity_to_item() 0 20 1
A _create_version() 0 14 2
A delete_item() 0 15 1
A _assemble_image_url_path() 0 7 2
A update_item() 0 24 1
A create_item() 0 26 1
A find_aggregated_item_by_slug() 0 20 3
A get_aggregated_items_paginated() 0 14 2
A _get_items_query() 0 7 1
A _find_db_item() 0 6 1
A _get_db_item() 0 8 2
A find_item() 0 8 2
A get_items_paginated() 0 6 1
A get_item_versions() 0 6 1
A get_current_item_version() 0 5 1
A publish_item() 0 24 4
A find_item_version() 0 3 1
A get_item_count_by_channel_id() 0 14 1
1
"""
2
byceps.services.news.service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2020 Jochen Kupperschmidt
6
:License: Modified BSD, see LICENSE for details.
7
"""
8
9 1
from datetime import datetime
10 1
from typing import Dict, Optional, Sequence
11
12 1
from ...database import db, paginate, Pagination, Query
13 1
from ...events.news import NewsItemPublished
14 1
from ...typing import UserID
15
16 1
from ..user import service as user_service
17
18 1
from .channel_service import _db_entity_to_channel
19 1
from .models.channel import Channel as DbChannel
20 1
from .models.item import (
21
    CurrentVersionAssociation as DbCurrentVersionAssociation,
22
    Item as DbItem,
23
    ItemVersion as DbItemVersion,
24
)
25 1
from . import image_service
26 1
from .transfer.models import ChannelID, Item, ItemID, ItemVersionID
27
28
29 1
def create_item(
30
    channel_id: ChannelID,
31
    slug: str,
32
    creator_id: UserID,
33
    title: str,
34
    body: str,
35
    *,
36
    image_url_path: Optional[str] = None,
37
) -> Item:
38
    """Create a news item, a version, and set the version as the item's
39
    current one.
40
    """
41 1
    item = DbItem(channel_id, slug)
42 1
    db.session.add(item)
43
44 1
    version = _create_version(
45
        item, creator_id, title, body, image_url_path=image_url_path
46
    )
47 1
    db.session.add(version)
48
49 1
    current_version_association = DbCurrentVersionAssociation(item, version)
50 1
    db.session.add(current_version_association)
51
52 1
    db.session.commit()
53
54 1
    return _db_entity_to_item(item)
55
56
57 1
def update_item(
58
    item_id: ItemID,
59
    slug: str,
60
    creator_id: UserID,
61
    title: str,
62
    body: str,
63
    *,
64
    image_url_path: Optional[str] = None,
65
) -> None:
66
    """Update a news item by creating a new version of it and setting
67
    the new version as the current one.
68
    """
69
    item = _get_db_item(item_id)
70
71
    item.slug = slug
72
73
    version = _create_version(
74
        item, creator_id, title, body, image_url_path=image_url_path
75
    )
76
    db.session.add(version)
77
78
    item.current_version = version
79
80
    db.session.commit()
81
82
83 1
def _create_version(
84
    item: DbItem,
85
    creator_id: UserID,
86
    title: str,
87
    body: str,
88
    *,
89
    image_url_path: Optional[str] = None,
90
) -> DbItemVersion:
91 1
    version = DbItemVersion(item, creator_id, title, body)
92
93 1
    if image_url_path:
94 1
        version.image_url_path = image_url_path
95
96 1
    return version
97
98
99 1
def publish_item(
100
    item_id: ItemID, *, initiator_id: Optional[UserID] = None
101
) -> NewsItemPublished:
102
    """Publish a news item."""
103 1
    db_item = _get_db_item(item_id)
104
105 1
    if initiator_id is not None:
106 1
        initiator = user_service.get_user(initiator_id)
107
    else:
108 1
        initiator = None
109
110 1
    db_item.published_at = datetime.utcnow()
111 1
    db.session.commit()
112
113 1
    item = _db_entity_to_item(db_item)
114
115 1
    return NewsItemPublished(
116
        occurred_at=item.published_at,
117
        initiator_id=initiator.id if initiator else None,
118
        initiator_screen_name=initiator.screen_name if initiator else None,
119
        item_id=item.id,
120
        channel_id=item.channel.id,
121
        title=item.title,
122
        external_url=item.external_url,
123
    )
124
125
126 1
def delete_item(item_id: ItemID) -> None:
127
    """Delete a news item and its versions."""
128 1
    db.session.query(DbCurrentVersionAssociation) \
129
        .filter_by(item_id=item_id) \
130
        .delete()
131
132 1
    db.session.query(DbItemVersion) \
133
        .filter_by(item_id=item_id) \
134
        .delete()
135
136 1
    db.session.query(DbItem) \
137
        .filter_by(id=item_id) \
138
        .delete()
139
140 1
    db.session.commit()
141
142
143 1
def find_item(item_id: ItemID) -> Optional[Item]:
144
    """Return the item with that id, or `None` if not found."""
145 1
    item = _find_db_item(item_id)
146
147 1
    if item is None:
148
        return None
149
150 1
    return _db_entity_to_item(item)
151
152
153 1
def _find_db_item(item_id: ItemID) -> Optional[DbItem]:
154
    """Return the item with that id, or `None` if not found."""
155 1
    return DbItem.query \
156
        .with_channel() \
157
        .with_images() \
158
        .get(item_id)
159
160
161 1
def _get_db_item(item_id: ItemID) -> DbItem:
162
    """Return the item with that id, or raise an exception."""
163 1
    item = _find_db_item(item_id)
164
165 1
    if item is None:
166
        raise ValueError(f'Unknown news item ID "{item_id}".')
167
168 1
    return item
169
170
171 1
def find_aggregated_item_by_slug(
172
    channel_id: ChannelID, slug: str, *, published_only: bool = False
173
) -> Optional[Item]:
174
    """Return the news item identified by that slug, or `None` if not found."""
175 1
    query = DbItem.query \
176
        .for_channel(channel_id) \
177
        .with_channel() \
178
        .with_current_version() \
179
        .with_images() \
180
        .filter_by(slug=slug)
181
182 1
    if published_only:
183 1
        query = query.published()
184
185 1
    item = query.one_or_none()
186
187 1
    if item is None:
188 1
        return None
189
190 1
    return _db_entity_to_item(item)
191
192
193 1
def get_aggregated_items_paginated(
194
    channel_id: ChannelID,
195
    page: int,
196
    items_per_page: int,
197
    *,
198
    published_only: bool = False,
199
) -> Pagination:
200
    """Return the news items to show on the specified page."""
201 1
    query = _get_items_query(channel_id)
202
203 1
    if published_only:
204 1
        query = query.published()
205
206 1
    return paginate(query, page, items_per_page, item_mapper=_db_entity_to_item)
207
208
209 1
def get_items_paginated(
210
    channel_id: ChannelID, page: int, items_per_page: int
211
) -> Pagination:
212
    """Return the news items to show on the specified page."""
213 1
    return _get_items_query(channel_id) \
214
        .paginate(page, items_per_page)
215
216
217 1
def _get_items_query(channel_id: ChannelID) -> Query:
218 1
    return DbItem.query \
219
        .for_channel(channel_id) \
220
        .with_channel() \
221
        .with_current_version() \
222
        .with_images() \
223
        .order_by(DbItem.published_at.desc())
224
225
226 1
def get_item_versions(item_id: ItemID) -> Sequence[DbItemVersion]:
227
    """Return all item versions, sorted from most recent to oldest."""
228 1
    return DbItemVersion.query \
229
        .for_item(item_id) \
230
        .order_by(DbItemVersion.created_at.desc()) \
231
        .all()
232
233
234 1
def get_current_item_version(item_id: ItemID) -> DbItemVersion:
235
    """Return the item's current version."""
236 1
    item = _get_db_item(item_id)
237
238 1
    return item.current_version
239
240
241 1
def find_item_version(version_id: ItemVersionID) -> DbItemVersion:
242
    """Return the item version with that ID, or `None` if not found."""
243 1
    return DbItemVersion.query.get(version_id)
244
245
246 1
def get_item_count_by_channel_id() -> Dict[ChannelID, int]:
247
    """Return news item count (including 0) per channel, indexed by
248
    channel ID.
249
    """
250 1
    channel_ids_and_item_counts = db.session \
251
        .query(
252
            DbChannel.id,
253
            db.func.count(DbItem.id)
254
        ) \
255
        .outerjoin(DbItem) \
256
        .group_by(DbChannel.id) \
257
        .all()
258
259 1
    return dict(channel_ids_and_item_counts)
260
261
262 1
def _db_entity_to_item(item: DbItem) -> Item:
263 1
    channel = _db_entity_to_channel(item.channel)
264 1
    external_url = item.channel.url_prefix + item.slug
265 1
    image_url_path = _assemble_image_url_path(item)
266 1
    images = [
267
        image_service._db_entity_to_image(image, item.channel_id)
268
        for image in item.images
269
    ]
270
271 1
    return Item(
272
        id=item.id,
273
        channel=channel,
274
        slug=item.slug,
275
        published_at=item.published_at,
276
        published=item.published_at is not None,
277
        title=item.current_version.title,
278
        body=item.current_version.body,
279
        external_url=external_url,
280
        image_url_path=image_url_path,
281
        images=images,
282
    )
283
284
285 1
def _assemble_image_url_path(item: DbItem) -> Optional[str]:
286 1
    url_path = item.current_version.image_url_path
287
288 1
    if not url_path:
289 1
        return None
290
291
    return f'/data/global/news_channels/{item.channel_id}/{url_path}'
292