1
|
|
|
""" |
2
|
|
|
byceps.services.snippet.service |
3
|
|
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
4
|
|
|
|
5
|
|
|
:Copyright: 2006-2019 Jochen Kupperschmidt |
6
|
|
|
:License: Modified BSD, see LICENSE for details. |
7
|
|
|
""" |
8
|
|
|
|
9
|
|
|
from typing import List, Optional, Sequence, Set, Tuple |
10
|
|
|
|
11
|
|
|
from ...database import db |
12
|
|
|
from ...events.snippet import SnippetCreated, SnippetUpdated |
13
|
|
|
from ...typing import UserID |
14
|
|
|
|
15
|
|
|
from .models.snippet import ( |
16
|
|
|
CurrentVersionAssociation as DbCurrentVersionAssociation, |
17
|
|
|
Snippet as DbSnippet, |
18
|
|
|
SnippetVersion as DbSnippetVersion, |
19
|
|
|
) |
20
|
|
|
from .transfer.models import Scope, SnippetID, SnippetType, SnippetVersionID |
21
|
|
|
|
22
|
|
|
|
23
|
|
|
# -------------------------------------------------------------------- # |
24
|
|
|
# document |
25
|
|
|
|
26
|
|
|
|
27
|
|
|
def create_document( |
28
|
|
|
scope: Scope, |
29
|
|
|
name: str, |
30
|
|
|
creator_id: UserID, |
31
|
|
|
title: str, |
32
|
|
|
body: str, |
33
|
|
|
*, |
34
|
|
|
head: Optional[str] = None, |
35
|
|
|
image_url_path: Optional[str] = None, |
36
|
|
|
) -> Tuple[DbSnippetVersion, SnippetCreated]: |
37
|
|
|
"""Create a document and its initial version, and return that version.""" |
38
|
|
|
return _create_snippet( |
39
|
|
|
scope, |
40
|
|
|
name, |
41
|
|
|
SnippetType.document, |
42
|
|
|
creator_id, |
43
|
|
|
body, |
44
|
|
|
title=title, |
45
|
|
|
head=head, |
46
|
|
|
image_url_path=image_url_path, |
47
|
|
|
) |
48
|
|
|
|
49
|
|
|
|
50
|
|
|
def update_document( |
51
|
|
|
snippet_id: SnippetID, |
52
|
|
|
creator_id: UserID, |
53
|
|
|
title: str, |
54
|
|
|
body: str, |
55
|
|
|
*, |
56
|
|
|
head: Optional[str] = None, |
57
|
|
|
image_url_path: Optional[str] = None, |
58
|
|
|
) -> Tuple[DbSnippetVersion, SnippetUpdated]: |
59
|
|
|
"""Update document with a new version, and return that version.""" |
60
|
|
|
return _update_snippet( |
61
|
|
|
snippet_id, creator_id, title, head, body, image_url_path |
62
|
|
|
) |
63
|
|
|
|
64
|
|
|
|
65
|
|
|
# -------------------------------------------------------------------- # |
66
|
|
|
# fragment |
67
|
|
|
|
68
|
|
|
|
69
|
|
|
def create_fragment( |
70
|
|
|
scope: Scope, name: str, creator_id: UserID, body: str |
71
|
|
|
) -> Tuple[DbSnippetVersion, SnippetCreated]: |
72
|
|
|
"""Create a fragment and its initial version, and return that version.""" |
73
|
|
|
return _create_snippet(scope, name, SnippetType.fragment, creator_id, body) |
74
|
|
|
|
75
|
|
|
|
76
|
|
|
def update_fragment( |
77
|
|
|
snippet_id: SnippetID, creator_id: UserID, body: str |
78
|
|
|
) -> Tuple[DbSnippetVersion, SnippetUpdated]: |
79
|
|
|
"""Update fragment with a new version, and return that version.""" |
80
|
|
|
title = None |
81
|
|
|
head = None |
82
|
|
|
image_url_path = None |
83
|
|
|
|
84
|
|
|
return _update_snippet( |
85
|
|
|
snippet_id, creator_id, title, head, body, image_url_path |
86
|
|
|
) |
87
|
|
|
|
88
|
|
|
|
89
|
|
|
# -------------------------------------------------------------------- # |
90
|
|
|
# snippet |
91
|
|
|
|
92
|
|
|
|
93
|
|
|
def _create_snippet( |
94
|
|
|
scope: Scope, |
95
|
|
|
name: str, |
96
|
|
|
type_: SnippetType, |
97
|
|
|
creator_id: UserID, |
98
|
|
|
body: str, |
99
|
|
|
*, |
100
|
|
|
title: Optional[str] = None, |
101
|
|
|
head: Optional[str] = None, |
102
|
|
|
image_url_path: Optional[str] = None, |
103
|
|
|
) -> Tuple[DbSnippetVersion, SnippetCreated]: |
104
|
|
|
"""Create a snippet and its initial version, and return that version.""" |
105
|
|
|
snippet = DbSnippet(scope, name, type_) |
106
|
|
|
db.session.add(snippet) |
107
|
|
|
|
108
|
|
|
version = DbSnippetVersion( |
109
|
|
|
snippet, creator_id, title, head, body, image_url_path |
110
|
|
|
) |
111
|
|
|
db.session.add(version) |
112
|
|
|
|
113
|
|
|
current_version_association = DbCurrentVersionAssociation(snippet, version) |
114
|
|
|
db.session.add(current_version_association) |
115
|
|
|
|
116
|
|
|
db.session.commit() |
117
|
|
|
|
118
|
|
|
event = SnippetCreated( |
119
|
|
|
occurred_at=version.created_at, snippet_version_id=version.id |
120
|
|
|
) |
121
|
|
|
|
122
|
|
|
return version, event |
123
|
|
|
|
124
|
|
|
|
125
|
|
|
def _update_snippet( |
126
|
|
|
snippet_id: SnippetID, |
127
|
|
|
creator_id: UserID, |
128
|
|
|
title: Optional[str], |
129
|
|
|
head: Optional[str], |
130
|
|
|
body: str, |
131
|
|
|
image_url_path: Optional[str], |
132
|
|
|
) -> Tuple[DbSnippetVersion, SnippetUpdated]: |
133
|
|
|
"""Update snippet with a new version, and return that version.""" |
134
|
|
|
snippet = find_snippet(snippet_id) |
135
|
|
|
if snippet is None: |
136
|
|
|
raise ValueError('Unknown snippet ID') |
137
|
|
|
|
138
|
|
|
version = DbSnippetVersion( |
139
|
|
|
snippet, creator_id, title, head, body, image_url_path |
140
|
|
|
) |
141
|
|
|
db.session.add(version) |
142
|
|
|
|
143
|
|
|
snippet.current_version = version |
144
|
|
|
|
145
|
|
|
db.session.commit() |
146
|
|
|
|
147
|
|
|
event = SnippetUpdated( |
148
|
|
|
occurred_at=version.created_at, snippet_version_id=version.id |
149
|
|
|
) |
150
|
|
|
|
151
|
|
|
return version, event |
152
|
|
|
|
153
|
|
|
|
154
|
|
|
def delete_snippet(snippet_id: SnippetID) -> bool: |
155
|
|
|
"""Delete the snippet and its versions. |
156
|
|
|
|
157
|
|
|
It is expected that no database records (mountpoints, consents, |
158
|
|
|
etc.) refer to the snippet anymore. |
159
|
|
|
|
160
|
|
|
Return `True` on success, or `False` if an error occured. |
161
|
|
|
""" |
162
|
|
|
snippet = find_snippet(snippet_id) |
163
|
|
|
if snippet is None: |
164
|
|
|
raise ValueError('Unknown snippet ID') |
165
|
|
|
|
166
|
|
|
db.session.delete(snippet.current_version_association) |
167
|
|
|
|
168
|
|
|
versions = get_versions(snippet_id) |
169
|
|
|
for version in versions: |
170
|
|
|
db.session.delete(version) |
171
|
|
|
|
172
|
|
|
db.session.delete(snippet) |
173
|
|
|
|
174
|
|
|
try: |
175
|
|
|
db.session.commit() |
176
|
|
|
return True |
177
|
|
|
except Exception: |
178
|
|
|
db.session.rollback() |
179
|
|
|
return False |
180
|
|
|
|
181
|
|
|
|
182
|
|
|
def find_snippet(snippet_id: SnippetID) -> Optional[DbSnippet]: |
183
|
|
|
"""Return the snippet with that id, or `None` if not found.""" |
184
|
|
|
return DbSnippet.query.get(snippet_id) |
185
|
|
|
|
186
|
|
|
|
187
|
|
|
def get_snippets(snippet_ids: Set[SnippetID]) -> Sequence[DbSnippet]: |
188
|
|
|
"""Return these snippets.""" |
189
|
|
|
return DbSnippet.query \ |
190
|
|
|
.filter(DbSnippet.id.in_(snippet_ids)) \ |
191
|
|
|
.all() |
192
|
|
|
|
193
|
|
|
|
194
|
|
|
def get_snippets_for_scope_with_current_versions( |
195
|
|
|
scope: Scope |
196
|
|
|
) -> Sequence[DbSnippet]: |
197
|
|
|
"""Return all snippets with their current versions for that scope.""" |
198
|
|
|
return DbSnippet.query \ |
199
|
|
|
.filter_by(scope_type=scope.type_) \ |
200
|
|
|
.filter_by(scope_name=scope.name) \ |
201
|
|
|
.options( |
202
|
|
|
db.joinedload('current_version_association').joinedload('version') |
203
|
|
|
) \ |
204
|
|
|
.all() |
205
|
|
|
|
206
|
|
|
|
207
|
|
|
def find_snippet_version( |
208
|
|
|
version_id: SnippetVersionID |
209
|
|
|
) -> Optional[DbSnippetVersion]: |
210
|
|
|
"""Return the snippet version with that id, or `None` if not found.""" |
211
|
|
|
return DbSnippetVersion.query.get(version_id) |
212
|
|
|
|
213
|
|
|
|
214
|
|
|
def find_current_version_of_snippet_with_name( |
215
|
|
|
scope: Scope, name: str |
216
|
|
|
) -> DbSnippetVersion: |
217
|
|
|
"""Return the current version of the snippet with that name in that |
218
|
|
|
scope, or `None` if not found. |
219
|
|
|
""" |
220
|
|
|
return DbSnippetVersion.query \ |
221
|
|
|
.join(DbCurrentVersionAssociation) \ |
222
|
|
|
.join(DbSnippet) \ |
223
|
|
|
.filter(DbSnippet.scope_type == scope.type_) \ |
224
|
|
|
.filter(DbSnippet.scope_name == scope.name) \ |
225
|
|
|
.filter(DbSnippet.name == name) \ |
226
|
|
|
.one_or_none() |
227
|
|
|
|
228
|
|
|
|
229
|
|
|
def get_versions(snippet_id: SnippetID) -> Sequence[DbSnippetVersion]: |
230
|
|
|
"""Return all versions of that snippet, sorted from most recent to |
231
|
|
|
oldest. |
232
|
|
|
""" |
233
|
|
|
return DbSnippetVersion.query \ |
234
|
|
|
.filter_by(snippet_id=snippet_id) \ |
235
|
|
|
.latest_first() \ |
236
|
|
|
.all() |
237
|
|
|
|
238
|
|
|
|
239
|
|
|
def search_snippets( |
240
|
|
|
search_term: str, scope: Optional[Scope] |
241
|
|
|
) -> List[DbSnippetVersion]: |
242
|
|
|
"""Search in (the latest versions of) snippets.""" |
243
|
|
|
q = DbSnippetVersion.query \ |
244
|
|
|
.join(DbCurrentVersionAssociation) \ |
245
|
|
|
.join(DbSnippet) |
246
|
|
|
|
247
|
|
|
if scope is not None: |
248
|
|
|
q = q \ |
249
|
|
|
.filter(DbSnippet.scope_type == scope.type_) \ |
250
|
|
|
.filter(DbSnippet.scope_name == scope.name) |
251
|
|
|
|
252
|
|
|
return q \ |
253
|
|
|
.filter( |
254
|
|
|
db.or_( |
255
|
|
|
DbSnippetVersion.title.contains(search_term), |
256
|
|
|
DbSnippetVersion.head.contains(search_term), |
257
|
|
|
DbSnippetVersion.body.contains(search_term), |
258
|
|
|
DbSnippetVersion.image_url_path.contains(search_term), |
259
|
|
|
) |
260
|
|
|
) \ |
261
|
|
|
.all() |
262
|
|
|
|
263
|
|
|
|
264
|
|
|
class SnippetNotFound(Exception): |
265
|
|
|
|
266
|
|
|
def __init__(self, scope: Scope, name: str) -> None: |
267
|
|
|
self.scope = scope |
268
|
|
|
self.name = name |
269
|
|
|
|