Passed
Branch main (854eb5)
by Jochen
04:24
created

byceps.services.board.posting_query_service   A

Complexity

Total Complexity 12

Size/Duplication

Total Lines 107
Duplicated Lines 0 %

Test Coverage

Coverage 97.62%

Importance

Changes 0
Metric Value
eloc 63
dl 0
loc 107
ccs 41
cts 42
cp 0.9762
rs 10
c 0
b 0
f 0
wmc 12

6 Functions

Rating   Name   Duplication   Size   Complexity  
A find_posting_by_id() 0 3 1
A count_postings_for_board() 0 5 1
A get_posting() 0 8 2
A calculate_posting_page_number() 0 19 4
A paginate_postings() 0 30 3
A _get_users_by_id() 0 7 1
1
"""
2
byceps.services.board.posting_query_service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2021 Jochen Kupperschmidt
6
:License: Revised BSD (see `LICENSE` file for details)
7
"""
8
9 1
from __future__ import annotations
10 1
from typing import Optional
11
12 1
from ...database import db, Pagination
13 1
from ...typing import PartyID, UserID
14 1
from ...util.iterables import index_of
15
16 1
from ..user import service as user_service
17 1
from ..user.transfer.models import User
18
19 1
from .dbmodels.category import Category as DbCategory
20 1
from .dbmodels.posting import Posting as DbPosting
21 1
from .dbmodels.topic import Topic as DbTopic
22 1
from .transfer.models import BoardID, PostingID, TopicID
23
24
25 1
def count_postings_for_board(board_id: BoardID) -> int:
26
    """Return the number of postings for that board."""
27 1
    return DbPosting.query \
28
        .join(DbTopic).join(DbCategory).filter(DbCategory.board_id == board_id) \
29
        .count()
30
31
32 1
def find_posting_by_id(posting_id: PostingID) -> Optional[DbPosting]:
33
    """Return the posting with that id, or `None` if not found."""
34 1
    return db.session.query(DbPosting).get(posting_id)
35
36
37 1
def get_posting(posting_id: PostingID) -> DbPosting:
38
    """Return the posting with that id."""
39 1
    posting = find_posting_by_id(posting_id)
40
41 1
    if posting is None:
42
        raise ValueError(f'Unknown posting ID "{posting_id}"')
43
44 1
    return posting
45
46
47 1
def paginate_postings(
48
    topic_id: TopicID,
49
    include_hidden: bool,
50
    party_id: Optional[PartyID],
51
    page: int,
52
    postings_per_page: int,
53
) -> Pagination:
54
    """Paginate postings in that topic, as visible for the user."""
55 1
    query = DbPosting.query \
56
        .options(
57
            db.joinedload(DbPosting.topic),
58
            db.joinedload(DbPosting.last_edited_by).load_only('screen_name'),
59
            db.joinedload(DbPosting.hidden_by).load_only('screen_name'),
60
        ) \
61
        .for_topic(topic_id)
62
63 1
    if not include_hidden:
64 1
        query = query.without_hidden()
65
66 1
    postings = query \
67
        .earliest_to_latest() \
68
        .paginate(page, postings_per_page)
69
70 1
    creator_ids = {posting.creator_id for posting in postings.items}
71 1
    creators_by_id = _get_users_by_id(creator_ids, party_id)
72
73 1
    for posting in postings.items:
74 1
        posting.creator = creators_by_id[posting.creator_id]
75
76 1
    return postings
77
78
79 1
def _get_users_by_id(
80
    user_ids: set[UserID], party_id: Optional[PartyID]
81
) -> dict[UserID, User]:
82 1
    users = user_service.find_users(
83
        user_ids, include_avatars=True, include_orga_flags_for_party_id=party_id
84
    )
85 1
    return user_service.index_users_by_id(users)
86
87
88 1
def calculate_posting_page_number(
89
    posting: DbPosting, include_hidden: bool, postings_per_page: int
90
) -> int:
91
    """Return the number of the page the posting should appear on."""
92 1
    query = DbPosting.query \
93
        .for_topic(posting.topic_id)
94
95 1
    if not include_hidden:
96 1
        query = query.without_hidden()
97
98 1
    topic_postings = query \
99
        .earliest_to_latest() \
100
        .all()
101
102 1
    index = index_of(topic_postings, lambda p: p == posting)
103 1
    if index is None:
104 1
        return 1  # Shouldn't happen.
105
106
    return divmod(index, postings_per_page)[0] + 1
107