Passed
Branch main (854eb5)
by Jochen
04:24
created

paginate_topics()   A

Complexity

Conditions 1

Size

Total Lines 10
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 8
nop 4
dl 0
loc 10
ccs 2
cts 2
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
"""
2
byceps.services.board.topic_query_service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2021 Jochen Kupperschmidt
6
:License: Revised BSD (see `LICENSE` file for details)
7
"""
8
9 1
from __future__ import annotations
10 1
from datetime import datetime
11 1
from typing import Optional
12
13 1
from ...database import db, Pagination, Query
14
15 1
from .dbmodels.category import Category as DbCategory
16 1
from .dbmodels.posting import Posting as DbPosting
17 1
from .dbmodels.topic import Topic as DbTopic
18 1
from .transfer.models import BoardID, CategoryID, TopicID
19
20
21 1
def count_topics_for_board(board_id: BoardID) -> int:
22
    """Return the number of topics for that board."""
23 1
    return DbTopic.query \
24
        .join(DbCategory) \
25
            .filter(DbCategory.board_id == board_id) \
26
        .count()
27
28
29 1
def find_topic_by_id(topic_id: TopicID) -> Optional[DbTopic]:
30
    """Return the topic with that id, or `None` if not found."""
31 1
    return db.session.query(DbTopic).get(topic_id)
32
33
34 1
def get_topic(topic_id: TopicID) -> DbTopic:
35
    """Return the topic with that id."""
36 1
    topic = find_topic_by_id(topic_id)
37
38 1
    if topic is None:
39
        raise ValueError(f'Unknown topic ID "{topic_id}"')
40
41 1
    return topic
42
43
44 1
def find_topic_visible_for_user(
45
    topic_id: TopicID, include_hidden: bool
46
) -> Optional[DbTopic]:
47
    """Return the topic with that id, or `None` if not found or
48
    invisible for the user.
49
    """
50 1
    query = DbTopic.query \
51
        .options(
52
            db.joinedload(DbTopic.category),
53
        )
54
55 1
    if not include_hidden:
56 1
        query = query.without_hidden()
57
58 1
    return query \
59
        .filter_by(id=topic_id) \
60
        .first()
61
62
63 1
def get_recent_topics(
64
    board_id: BoardID, include_hidden: bool, limit: int
65
) -> list[DbTopic]:
66
    """Paginate topics in that board."""
67 1
    return _query_topics(include_hidden) \
68
        .join(DbCategory) \
69
            .filter(DbCategory.board_id == board_id) \
70
            .filter(DbCategory.hidden == False) \
71
        .order_by(DbTopic.last_updated_at.desc()) \
72
        .limit(limit) \
73
        .all()
74
75
76 1
def paginate_topics(
77
    board_id: BoardID, include_hidden: bool, page: int, topics_per_page: int
78
) -> Pagination:
79
    """Paginate topics in that board."""
80 1
    return _query_topics(include_hidden) \
81
        .join(DbCategory) \
82
            .filter(DbCategory.board_id == board_id) \
83
            .filter(DbCategory.hidden == False) \
84
        .order_by(DbTopic.last_updated_at.desc()) \
85
        .paginate(page, topics_per_page)
86
87
88 1
def get_all_topic_ids_in_category(category_id: CategoryID) -> set[TopicID]:
89
    """Return the IDs of all topics in the category."""
90 1
    rows = db.session \
91
        .query(DbTopic.id) \
92
        .filter(DbTopic.category_id == category_id) \
93
        .all()
94
95 1
    return {row[0] for row in rows}
96
97
98 1
def paginate_topics_of_category(
99
    category_id: CategoryID,
100
    include_hidden: bool,
101
    page: int,
102
    topics_per_page: int,
103
) -> Pagination:
104
    """Paginate topics in that category, as visible for the user.
105
106
    Pinned topics are returned first.
107
    """
108 1
    return _query_topics(include_hidden) \
109
        .for_category(category_id) \
110
        .order_by(DbTopic.pinned.desc(), DbTopic.last_updated_at.desc()) \
111
        .paginate(page, topics_per_page)
112
113
114 1
def _query_topics(include_hidden: bool) -> Query:
115 1
    query = DbTopic.query \
116
        .options(
117
            db.joinedload(DbTopic.category),
118
            db.joinedload(DbTopic.last_updated_by),
119
            db.joinedload(DbTopic.hidden_by),
120
            db.joinedload(DbTopic.locked_by),
121
            db.joinedload(DbTopic.pinned_by),
122
        )
123
124 1
    if not include_hidden:
125 1
        query = query.without_hidden()
126
127 1
    return query
128
129
130 1
def find_default_posting_to_jump_to(
131
    topic_id: TopicID, include_hidden: bool, last_viewed_at: datetime
132
) -> Optional[DbPosting]:
133
    """Return the posting of the topic to show by default, or `None`."""
134
    postings_query = DbPosting.query.for_topic(topic_id)
135
    if not include_hidden:
136
        postings_query = postings_query.without_hidden()
137
138
    first_new_posting = postings_query \
139
        .filter(DbPosting.created_at > last_viewed_at) \
140
        .earliest_to_latest() \
141
        .first()
142
143
    if first_new_posting is None:
144
        # Current user has seen all postings so far, so show the last one.
145
        return postings_query \
146
            .latest_to_earliest() \
147
            .first()
148
149
    return first_new_posting
150