1
|
|
|
""" |
2
|
|
|
byceps.services.tourney.match_comment_service |
3
|
|
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
4
|
|
|
|
5
|
|
|
:Copyright: 2006-2019 Jochen Kupperschmidt |
6
|
|
|
:License: Modified BSD, see LICENSE for details. |
7
|
|
|
""" |
8
|
|
|
|
9
|
|
|
from datetime import datetime |
10
|
|
|
from typing import Dict, Optional, Sequence, Set |
11
|
|
|
|
12
|
|
|
from ...database import db |
13
|
|
|
from ...services.text_markup import service as text_markup_service |
14
|
|
|
from ...services.user import service as user_service |
15
|
|
|
from ...services.user.transfer.models import User |
16
|
|
|
from ...typing import PartyID, UserID |
17
|
|
|
|
18
|
|
|
from .models.match_comment import MatchComment as DbMatchComment |
19
|
|
|
from .transfer.models import MatchID, MatchComment, MatchCommentID |
20
|
|
|
|
21
|
|
|
|
22
|
|
|
def find_comment( |
23
|
|
|
comment_id: MatchCommentID, *, party_id: Optional[PartyID] = None |
24
|
|
|
) -> Optional[MatchComment]: |
25
|
|
|
"""Return the comment, or `None` if not found.""" |
26
|
|
|
comment = DbMatchComment.query.get(comment_id) |
27
|
|
|
|
28
|
|
|
if comment is None: |
29
|
|
|
return None |
30
|
|
|
|
31
|
|
|
# creator |
32
|
|
|
creator = _find_user(comment.created_by_id, party_id) |
33
|
|
|
|
34
|
|
|
# last editor |
35
|
|
|
last_editor = None |
36
|
|
|
if comment.last_edited_by_id: |
37
|
|
|
last_editor = _find_user(comment.last_edited_by_id, party_id) |
38
|
|
|
|
39
|
|
|
# moderator |
40
|
|
|
moderator = None |
41
|
|
|
if comment.hidden_by_id: |
42
|
|
|
moderator = _find_user(comment.hidden_by_id, party_id) |
43
|
|
|
|
44
|
|
|
return _db_entity_to_comment( |
45
|
|
|
comment, creator, last_editor=last_editor, moderator=moderator, |
46
|
|
|
) |
47
|
|
|
|
48
|
|
|
|
49
|
|
|
def get_comment( |
50
|
|
|
comment_id: MatchCommentID, *, party_id: Optional[PartyID] = None |
51
|
|
|
) -> MatchComment: |
52
|
|
|
"""Return the comment. |
53
|
|
|
|
54
|
|
|
Raise exception if comment ID is unknown. |
55
|
|
|
""" |
56
|
|
|
comment = find_comment(comment_id) |
57
|
|
|
|
58
|
|
|
if comment is None: |
59
|
|
|
raise ValueError('Unknown match comment ID') |
60
|
|
|
|
61
|
|
|
return comment |
62
|
|
|
|
63
|
|
|
|
64
|
|
|
def get_comments( |
65
|
|
|
match_id: MatchID, |
66
|
|
|
*, |
67
|
|
|
party_id: Optional[PartyID] = None, |
68
|
|
|
include_hidden: bool = False, |
69
|
|
|
) -> Sequence[MatchComment]: |
70
|
|
|
"""Return comments on the match, ordered chronologically.""" |
71
|
|
|
query = DbMatchComment.query \ |
72
|
|
|
.for_match(match_id) |
73
|
|
|
|
74
|
|
|
if not include_hidden: |
75
|
|
|
query = query.filter_by(hidden=False) |
76
|
|
|
|
77
|
|
|
db_comments = query \ |
78
|
|
|
.for_match(match_id) \ |
79
|
|
|
.order_by(DbMatchComment.created_at) \ |
80
|
|
|
.all() |
81
|
|
|
|
82
|
|
|
# creators |
83
|
|
|
creator_ids = {comment.created_by_id for comment in db_comments} |
84
|
|
|
creators_by_id = _get_users_by_id(creator_ids, party_id=party_id) |
85
|
|
|
|
86
|
|
|
# last editors |
87
|
|
|
last_editor_ids = { |
88
|
|
|
comment.last_edited_by_id |
89
|
|
|
for comment in db_comments |
90
|
|
|
if comment.last_edited_by_id |
91
|
|
|
} |
92
|
|
|
last_editors_by_id = _get_users_by_id(last_editor_ids, party_id=party_id) |
93
|
|
|
|
94
|
|
|
# moderators |
95
|
|
|
moderator_ids = { |
96
|
|
|
comment.hidden_by_id for comment in db_comments if comment.hidden_by_id |
97
|
|
|
} |
98
|
|
|
moderators_by_id = _get_users_by_id(moderator_ids, party_id=party_id) |
99
|
|
|
|
100
|
|
|
comments = [] |
101
|
|
|
for db_comment in db_comments: |
102
|
|
|
creator = creators_by_id[db_comment.created_by_id] |
103
|
|
|
last_editor = last_editors_by_id.get(db_comment.last_edited_by_id) |
104
|
|
|
moderator = moderators_by_id.get(db_comment.hidden_by_id) |
105
|
|
|
|
106
|
|
|
comment = _db_entity_to_comment( |
107
|
|
|
db_comment, creator, last_editor=last_editor, moderator=moderator, |
108
|
|
|
) |
109
|
|
|
comments.append(comment) |
110
|
|
|
|
111
|
|
|
return comments |
112
|
|
|
|
113
|
|
|
|
114
|
|
|
def _get_users_by_id( |
115
|
|
|
user_ids: Set[UserID], *, party_id: Optional[PartyID] = None |
116
|
|
|
) -> Dict[UserID, User]: |
117
|
|
|
users = user_service.find_users( |
118
|
|
|
user_ids, include_avatars=True, include_orga_flags_for_party_id=party_id |
119
|
|
|
) |
120
|
|
|
|
121
|
|
|
return user_service.index_users_by_id(users) |
122
|
|
|
|
123
|
|
|
|
124
|
|
|
def create_comment( |
125
|
|
|
match_id: MatchID, creator_id: UserID, body: str |
126
|
|
|
) -> DbMatchComment: |
127
|
|
|
"""Create a comment on a match.""" |
128
|
|
|
comment = DbMatchComment(match_id, creator_id, body) |
129
|
|
|
|
130
|
|
|
db.session.add(comment) |
131
|
|
|
db.session.commit() |
132
|
|
|
|
133
|
|
|
return get_comment(comment.id) |
134
|
|
|
|
135
|
|
|
|
136
|
|
|
def update_comment( |
137
|
|
|
comment_id: MatchCommentID, editor_id: UserID, body: str |
138
|
|
|
) -> DbMatchComment: |
139
|
|
|
"""Update a comment on a match.""" |
140
|
|
|
comment = _get_db_comment(comment_id) |
141
|
|
|
|
142
|
|
|
comment.body = body |
143
|
|
|
comment.last_edited_at = datetime.utcnow() |
144
|
|
|
comment.last_edited_by_id = editor_id |
145
|
|
|
|
146
|
|
|
db.session.commit() |
147
|
|
|
|
148
|
|
|
return get_comment(comment.id) |
149
|
|
|
|
150
|
|
|
|
151
|
|
|
def hide_comment(comment_id: MatchCommentID, initiator_id: UserID) -> None: |
152
|
|
|
"""Hide the match comment.""" |
153
|
|
|
comment = _get_db_comment(comment_id) |
154
|
|
|
|
155
|
|
|
comment.hidden = True |
156
|
|
|
comment.hidden_at = datetime.utcnow() |
157
|
|
|
comment.hidden_by_id = initiator_id |
158
|
|
|
|
159
|
|
|
db.session.commit() |
160
|
|
|
|
161
|
|
|
|
162
|
|
|
def unhide_comment(comment_id: MatchCommentID, initiator_id: UserID) -> None: |
163
|
|
|
"""Un-hide the match comment.""" |
164
|
|
|
comment = _get_db_comment(comment_id) |
165
|
|
|
|
166
|
|
|
comment.hidden = False |
167
|
|
|
comment.hidden_at = None |
168
|
|
|
comment.hidden_by_id = None |
169
|
|
|
|
170
|
|
|
db.session.commit() |
171
|
|
|
|
172
|
|
|
|
173
|
|
|
def _get_db_comment(comment_id: MatchCommentID) -> DbMatchComment: |
174
|
|
|
"""Return the comment as database entity. |
175
|
|
|
|
176
|
|
|
Raise exception if comment ID is unknown. |
177
|
|
|
""" |
178
|
|
|
comment = DbMatchComment.query.get(comment_id) |
179
|
|
|
|
180
|
|
|
if comment is None: |
181
|
|
|
raise ValueError('Unknown match comment ID') |
182
|
|
|
|
183
|
|
|
return comment |
184
|
|
|
|
185
|
|
|
|
186
|
|
|
def _find_user( |
187
|
|
|
user_id: UserID, party_id: Optional[PartyID] = None |
188
|
|
|
) -> Optional[User]: |
189
|
|
|
return user_service.find_user( |
190
|
|
|
user_id, include_avatar=True, include_orga_flag_for_party_id=party_id |
191
|
|
|
) |
192
|
|
|
|
193
|
|
|
|
194
|
|
|
def _db_entity_to_comment( |
195
|
|
|
comment: DbMatchComment, |
196
|
|
|
creator: User, |
197
|
|
|
*, |
198
|
|
|
last_editor: Optional[User], |
199
|
|
|
moderator: Optional[User], |
200
|
|
|
) -> MatchComment: |
201
|
|
|
body_html = text_markup_service.render_html(comment.body) |
202
|
|
|
|
203
|
|
|
return MatchComment( |
204
|
|
|
comment.id, |
205
|
|
|
comment.match_id, |
206
|
|
|
comment.created_at, |
207
|
|
|
creator, |
208
|
|
|
comment.body, |
209
|
|
|
body_html, |
210
|
|
|
comment.last_edited_at, |
211
|
|
|
last_editor, |
212
|
|
|
comment.hidden, |
213
|
|
|
comment.hidden_at, |
214
|
|
|
moderator, |
215
|
|
|
) |
216
|
|
|
|