1
|
|
|
""" |
2
|
|
|
byceps.services.board.topic_query_service |
3
|
|
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
4
|
|
|
|
5
|
|
|
:Copyright: 2006-2021 Jochen Kupperschmidt |
6
|
|
|
:License: Revised BSD (see `LICENSE` file for details) |
7
|
|
|
""" |
8
|
|
|
|
9
|
1 |
|
from __future__ import annotations |
10
|
1 |
|
from datetime import datetime |
11
|
1 |
|
from typing import Optional |
12
|
|
|
|
13
|
1 |
|
from ...database import db, Pagination, Query |
14
|
|
|
|
15
|
1 |
|
from .dbmodels.category import Category as DbCategory |
16
|
1 |
|
from .dbmodels.posting import Posting as DbPosting |
17
|
1 |
|
from .dbmodels.topic import Topic as DbTopic |
18
|
1 |
|
from .transfer.models import BoardID, CategoryID, TopicID |
19
|
|
|
|
20
|
|
|
|
21
|
1 |
|
def count_topics_for_board(board_id: BoardID) -> int: |
22
|
|
|
"""Return the number of topics for that board.""" |
23
|
1 |
|
return DbTopic.query \ |
24
|
|
|
.join(DbCategory) \ |
25
|
|
|
.filter(DbCategory.board_id == board_id) \ |
26
|
|
|
.count() |
27
|
|
|
|
28
|
|
|
|
29
|
1 |
|
def find_topic_by_id(topic_id: TopicID) -> Optional[DbTopic]: |
30
|
|
|
"""Return the topic with that id, or `None` if not found.""" |
31
|
1 |
|
return db.session.query(DbTopic).get(topic_id) |
32
|
|
|
|
33
|
|
|
|
34
|
1 |
|
def get_topic(topic_id: TopicID) -> DbTopic: |
35
|
|
|
"""Return the topic with that id.""" |
36
|
1 |
|
topic = find_topic_by_id(topic_id) |
37
|
|
|
|
38
|
1 |
|
if topic is None: |
39
|
|
|
raise ValueError(f'Unknown topic ID "{topic_id}"') |
40
|
|
|
|
41
|
1 |
|
return topic |
42
|
|
|
|
43
|
|
|
|
44
|
1 |
|
def find_topic_visible_for_user( |
45
|
|
|
topic_id: TopicID, include_hidden: bool |
46
|
|
|
) -> Optional[DbTopic]: |
47
|
|
|
"""Return the topic with that id, or `None` if not found or |
48
|
|
|
invisible for the user. |
49
|
|
|
""" |
50
|
1 |
|
query = DbTopic.query \ |
51
|
|
|
.options( |
52
|
|
|
db.joinedload(DbTopic.category), |
53
|
|
|
) |
54
|
|
|
|
55
|
1 |
|
if not include_hidden: |
56
|
1 |
|
query = query.without_hidden() |
57
|
|
|
|
58
|
1 |
|
return query \ |
59
|
|
|
.filter_by(id=topic_id) \ |
60
|
|
|
.first() |
61
|
|
|
|
62
|
|
|
|
63
|
1 |
|
def get_recent_topics( |
64
|
|
|
board_id: BoardID, include_hidden: bool, limit: int |
65
|
|
|
) -> list[DbTopic]: |
66
|
|
|
"""Paginate topics in that board.""" |
67
|
1 |
|
return _query_topics(include_hidden) \ |
68
|
|
|
.join(DbCategory) \ |
69
|
|
|
.filter(DbCategory.board_id == board_id) \ |
70
|
|
|
.filter(DbCategory.hidden == False) \ |
71
|
|
|
.order_by(DbTopic.last_updated_at.desc()) \ |
72
|
|
|
.limit(limit) \ |
73
|
|
|
.all() |
74
|
|
|
|
75
|
|
|
|
76
|
1 |
|
def paginate_topics( |
77
|
|
|
board_id: BoardID, include_hidden: bool, page: int, topics_per_page: int |
78
|
|
|
) -> Pagination: |
79
|
|
|
"""Paginate topics in that board.""" |
80
|
1 |
|
return _query_topics(include_hidden) \ |
81
|
|
|
.join(DbCategory) \ |
82
|
|
|
.filter(DbCategory.board_id == board_id) \ |
83
|
|
|
.filter(DbCategory.hidden == False) \ |
84
|
|
|
.order_by(DbTopic.last_updated_at.desc()) \ |
85
|
|
|
.paginate(page, topics_per_page) |
86
|
|
|
|
87
|
|
|
|
88
|
1 |
|
def get_all_topic_ids_in_category(category_id: CategoryID) -> set[TopicID]: |
89
|
|
|
"""Return the IDs of all topics in the category.""" |
90
|
1 |
|
rows = db.session \ |
91
|
|
|
.query(DbTopic.id) \ |
92
|
|
|
.filter(DbTopic.category_id == category_id) \ |
93
|
|
|
.all() |
94
|
|
|
|
95
|
1 |
|
return {row[0] for row in rows} |
96
|
|
|
|
97
|
|
|
|
98
|
1 |
|
def paginate_topics_of_category( |
99
|
|
|
category_id: CategoryID, |
100
|
|
|
include_hidden: bool, |
101
|
|
|
page: int, |
102
|
|
|
topics_per_page: int, |
103
|
|
|
) -> Pagination: |
104
|
|
|
"""Paginate topics in that category, as visible for the user. |
105
|
|
|
|
106
|
|
|
Pinned topics are returned first. |
107
|
|
|
""" |
108
|
1 |
|
return _query_topics(include_hidden) \ |
109
|
|
|
.for_category(category_id) \ |
110
|
|
|
.order_by(DbTopic.pinned.desc(), DbTopic.last_updated_at.desc()) \ |
111
|
|
|
.paginate(page, topics_per_page) |
112
|
|
|
|
113
|
|
|
|
114
|
1 |
|
def _query_topics(include_hidden: bool) -> Query: |
115
|
1 |
|
query = DbTopic.query \ |
116
|
|
|
.options( |
117
|
|
|
db.joinedload(DbTopic.category), |
118
|
|
|
db.joinedload(DbTopic.last_updated_by), |
119
|
|
|
db.joinedload(DbTopic.hidden_by), |
120
|
|
|
db.joinedload(DbTopic.locked_by), |
121
|
|
|
db.joinedload(DbTopic.pinned_by), |
122
|
|
|
) |
123
|
|
|
|
124
|
1 |
|
if not include_hidden: |
125
|
1 |
|
query = query.without_hidden() |
126
|
|
|
|
127
|
1 |
|
return query |
128
|
|
|
|
129
|
|
|
|
130
|
1 |
|
def find_default_posting_to_jump_to( |
131
|
|
|
topic_id: TopicID, include_hidden: bool, last_viewed_at: datetime |
132
|
|
|
) -> Optional[DbPosting]: |
133
|
|
|
"""Return the posting of the topic to show by default, or `None`.""" |
134
|
|
|
postings_query = DbPosting.query.for_topic(topic_id) |
135
|
|
|
if not include_hidden: |
136
|
|
|
postings_query = postings_query.without_hidden() |
137
|
|
|
|
138
|
|
|
first_new_posting = postings_query \ |
139
|
|
|
.filter(DbPosting.created_at > last_viewed_at) \ |
140
|
|
|
.earliest_to_latest() \ |
141
|
|
|
.first() |
142
|
|
|
|
143
|
|
|
if first_new_posting is None: |
144
|
|
|
# Current user has seen all postings so far, so show the last one. |
145
|
|
|
return postings_query \ |
146
|
|
|
.latest_to_earliest() \ |
147
|
|
|
.first() |
148
|
|
|
|
149
|
|
|
return first_new_posting |
150
|
|
|
|