1
|
|
|
""" |
2
|
|
|
byceps.services.board.posting_query_service |
3
|
|
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
4
|
|
|
|
5
|
|
|
:Copyright: 2006-2021 Jochen Kupperschmidt |
6
|
|
|
:License: Revised BSD (see `LICENSE` file for details) |
7
|
|
|
""" |
8
|
|
|
|
9
|
1 |
|
from __future__ import annotations |
10
|
1 |
|
from typing import Optional |
11
|
|
|
|
12
|
1 |
|
from ...database import db, Pagination |
13
|
1 |
|
from ...typing import PartyID, UserID |
14
|
1 |
|
from ...util.iterables import index_of |
15
|
|
|
|
16
|
1 |
|
from ..user import service as user_service |
17
|
1 |
|
from ..user.transfer.models import User |
18
|
|
|
|
19
|
1 |
|
from .dbmodels.category import Category as DbCategory |
20
|
1 |
|
from .dbmodels.posting import Posting as DbPosting |
21
|
1 |
|
from .dbmodels.topic import Topic as DbTopic |
22
|
1 |
|
from .transfer.models import BoardID, PostingID, TopicID |
23
|
|
|
|
24
|
|
|
|
25
|
1 |
|
def count_postings_for_board(board_id: BoardID) -> int: |
26
|
|
|
"""Return the number of postings for that board.""" |
27
|
1 |
|
return DbPosting.query \ |
28
|
|
|
.join(DbTopic).join(DbCategory).filter(DbCategory.board_id == board_id) \ |
29
|
|
|
.count() |
30
|
|
|
|
31
|
|
|
|
32
|
1 |
|
def find_posting_by_id(posting_id: PostingID) -> Optional[DbPosting]: |
33
|
|
|
"""Return the posting with that id, or `None` if not found.""" |
34
|
1 |
|
return db.session.query(DbPosting).get(posting_id) |
35
|
|
|
|
36
|
|
|
|
37
|
1 |
|
def get_posting(posting_id: PostingID) -> DbPosting: |
38
|
|
|
"""Return the posting with that id.""" |
39
|
1 |
|
posting = find_posting_by_id(posting_id) |
40
|
|
|
|
41
|
1 |
|
if posting is None: |
42
|
|
|
raise ValueError(f'Unknown posting ID "{posting_id}"') |
43
|
|
|
|
44
|
1 |
|
return posting |
45
|
|
|
|
46
|
|
|
|
47
|
1 |
|
def paginate_postings( |
48
|
|
|
topic_id: TopicID, |
49
|
|
|
include_hidden: bool, |
50
|
|
|
party_id: Optional[PartyID], |
51
|
|
|
page: int, |
52
|
|
|
postings_per_page: int, |
53
|
|
|
) -> Pagination: |
54
|
|
|
"""Paginate postings in that topic, as visible for the user.""" |
55
|
1 |
|
query = DbPosting.query \ |
56
|
|
|
.options( |
57
|
|
|
db.joinedload(DbPosting.topic), |
58
|
|
|
db.joinedload(DbPosting.last_edited_by).load_only('screen_name'), |
59
|
|
|
db.joinedload(DbPosting.hidden_by).load_only('screen_name'), |
60
|
|
|
) \ |
61
|
|
|
.for_topic(topic_id) |
62
|
|
|
|
63
|
1 |
|
if not include_hidden: |
64
|
1 |
|
query = query.without_hidden() |
65
|
|
|
|
66
|
1 |
|
postings = query \ |
67
|
|
|
.earliest_to_latest() \ |
68
|
|
|
.paginate(page, postings_per_page) |
69
|
|
|
|
70
|
1 |
|
creator_ids = {posting.creator_id for posting in postings.items} |
71
|
1 |
|
creators_by_id = _get_users_by_id(creator_ids, party_id) |
72
|
|
|
|
73
|
1 |
|
for posting in postings.items: |
74
|
1 |
|
posting.creator = creators_by_id[posting.creator_id] |
75
|
|
|
|
76
|
1 |
|
return postings |
77
|
|
|
|
78
|
|
|
|
79
|
1 |
|
def _get_users_by_id( |
80
|
|
|
user_ids: set[UserID], party_id: Optional[PartyID] |
81
|
|
|
) -> dict[UserID, User]: |
82
|
1 |
|
users = user_service.find_users( |
83
|
|
|
user_ids, include_avatars=True, include_orga_flags_for_party_id=party_id |
84
|
|
|
) |
85
|
1 |
|
return user_service.index_users_by_id(users) |
86
|
|
|
|
87
|
|
|
|
88
|
1 |
|
def calculate_posting_page_number( |
89
|
|
|
posting: DbPosting, include_hidden: bool, postings_per_page: int |
90
|
|
|
) -> int: |
91
|
|
|
"""Return the number of the page the posting should appear on.""" |
92
|
1 |
|
query = DbPosting.query \ |
93
|
|
|
.for_topic(posting.topic_id) |
94
|
|
|
|
95
|
1 |
|
if not include_hidden: |
96
|
1 |
|
query = query.without_hidden() |
97
|
|
|
|
98
|
1 |
|
topic_postings = query \ |
99
|
|
|
.earliest_to_latest() \ |
100
|
|
|
.all() |
101
|
|
|
|
102
|
1 |
|
index = index_of(topic_postings, lambda p: p == posting) |
103
|
1 |
|
if index is None: |
104
|
1 |
|
return 1 # Shouldn't happen. |
105
|
|
|
|
106
|
|
|
return divmod(index, postings_per_page)[0] + 1 |
107
|
|
|
|