Passed
Push — master ( 1bb58d...3058b1 )
by Jochen
02:25
created

byceps.services.snippet.service.search_snippets()   A

Complexity

Conditions 2

Size

Total Lines 23
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 17
nop 2
dl 0
loc 23
rs 9.55
c 0
b 0
f 0
1
"""
2
byceps.services.snippet.service
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2019 Jochen Kupperschmidt
6
:License: Modified BSD, see LICENSE for details.
7
"""
8
9
from typing import List, Optional, Sequence, Set, Tuple
10
11
from ...database import db
12
from ...events.snippet import SnippetCreated, SnippetUpdated
13
from ...typing import UserID
14
15
from .models.snippet import (
16
    CurrentVersionAssociation as DbCurrentVersionAssociation,
17
    Snippet as DbSnippet,
18
    SnippetVersion as DbSnippetVersion,
19
)
20
from .transfer.models import Scope, SnippetID, SnippetType, SnippetVersionID
21
22
23
# -------------------------------------------------------------------- #
24
# document
25
26
27
def create_document(
28
    scope: Scope,
29
    name: str,
30
    creator_id: UserID,
31
    title: str,
32
    body: str,
33
    *,
34
    head: Optional[str] = None,
35
    image_url_path: Optional[str] = None,
36
) -> Tuple[DbSnippetVersion, SnippetCreated]:
37
    """Create a document and its initial version, and return that version."""
38
    return _create_snippet(
39
        scope,
40
        name,
41
        SnippetType.document,
42
        creator_id,
43
        body,
44
        title=title,
45
        head=head,
46
        image_url_path=image_url_path,
47
    )
48
49
50
def update_document(
51
    snippet_id: SnippetID,
52
    creator_id: UserID,
53
    title: str,
54
    body: str,
55
    *,
56
    head: Optional[str] = None,
57
    image_url_path: Optional[str] = None,
58
) -> Tuple[DbSnippetVersion, SnippetUpdated]:
59
    """Update document with a new version, and return that version."""
60
    return _update_snippet(
61
        snippet_id, creator_id, title, head, body, image_url_path
62
    )
63
64
65
# -------------------------------------------------------------------- #
66
# fragment
67
68
69
def create_fragment(
70
    scope: Scope, name: str, creator_id: UserID, body: str
71
) -> Tuple[DbSnippetVersion, SnippetCreated]:
72
    """Create a fragment and its initial version, and return that version."""
73
    return _create_snippet(scope, name, SnippetType.fragment, creator_id, body)
74
75
76
def update_fragment(
77
    snippet_id: SnippetID, creator_id: UserID, body: str
78
) -> Tuple[DbSnippetVersion, SnippetUpdated]:
79
    """Update fragment with a new version, and return that version."""
80
    title = None
81
    head = None
82
    image_url_path = None
83
84
    return _update_snippet(
85
        snippet_id, creator_id, title, head, body, image_url_path
86
    )
87
88
89
# -------------------------------------------------------------------- #
90
# snippet
91
92
93
def _create_snippet(
94
    scope: Scope,
95
    name: str,
96
    type_: SnippetType,
97
    creator_id: UserID,
98
    body: str,
99
    *,
100
    title: Optional[str] = None,
101
    head: Optional[str] = None,
102
    image_url_path: Optional[str] = None,
103
) -> Tuple[DbSnippetVersion, SnippetCreated]:
104
    """Create a snippet and its initial version, and return that version."""
105
    snippet = DbSnippet(scope, name, type_)
106
    db.session.add(snippet)
107
108
    version = DbSnippetVersion(
109
        snippet, creator_id, title, head, body, image_url_path
110
    )
111
    db.session.add(version)
112
113
    current_version_association = DbCurrentVersionAssociation(snippet, version)
114
    db.session.add(current_version_association)
115
116
    db.session.commit()
117
118
    event = SnippetCreated(
119
        occurred_at=version.created_at, snippet_version_id=version.id
120
    )
121
122
    return version, event
123
124
125
def _update_snippet(
126
    snippet_id: SnippetID,
127
    creator_id: UserID,
128
    title: Optional[str],
129
    head: Optional[str],
130
    body: str,
131
    image_url_path: Optional[str],
132
) -> Tuple[DbSnippetVersion, SnippetUpdated]:
133
    """Update snippet with a new version, and return that version."""
134
    snippet = find_snippet(snippet_id)
135
    if snippet is None:
136
        raise ValueError('Unknown snippet ID')
137
138
    version = DbSnippetVersion(
139
        snippet, creator_id, title, head, body, image_url_path
140
    )
141
    db.session.add(version)
142
143
    snippet.current_version = version
144
145
    db.session.commit()
146
147
    event = SnippetUpdated(
148
        occurred_at=version.created_at, snippet_version_id=version.id
149
    )
150
151
    return version, event
152
153
154
def delete_snippet(snippet_id: SnippetID) -> bool:
155
    """Delete the snippet and its versions.
156
157
    It is expected that no database records (mountpoints, consents,
158
    etc.) refer to the snippet anymore.
159
160
    Return `True` on success, or `False` if an error occured.
161
    """
162
    snippet = find_snippet(snippet_id)
163
    if snippet is None:
164
        raise ValueError('Unknown snippet ID')
165
166
    db.session.delete(snippet.current_version_association)
167
168
    versions = get_versions(snippet_id)
169
    for version in versions:
170
        db.session.delete(version)
171
172
    db.session.delete(snippet)
173
174
    try:
175
        db.session.commit()
176
        return True
177
    except Exception:
178
        db.session.rollback()
179
        return False
180
181
182
def find_snippet(snippet_id: SnippetID) -> Optional[DbSnippet]:
183
    """Return the snippet with that id, or `None` if not found."""
184
    return DbSnippet.query.get(snippet_id)
185
186
187
def get_snippets(snippet_ids: Set[SnippetID]) -> Sequence[DbSnippet]:
188
    """Return these snippets."""
189
    return DbSnippet.query \
190
        .filter(DbSnippet.id.in_(snippet_ids)) \
191
        .all()
192
193
194
def get_snippets_for_scope_with_current_versions(
195
    scope: Scope
196
) -> Sequence[DbSnippet]:
197
    """Return all snippets with their current versions for that scope."""
198
    return DbSnippet.query \
199
        .filter_by(scope_type=scope.type_) \
200
        .filter_by(scope_name=scope.name) \
201
        .options(
202
            db.joinedload('current_version_association').joinedload('version')
203
        ) \
204
        .all()
205
206
207
def find_snippet_version(
208
    version_id: SnippetVersionID
209
) -> Optional[DbSnippetVersion]:
210
    """Return the snippet version with that id, or `None` if not found."""
211
    return DbSnippetVersion.query.get(version_id)
212
213
214
def find_current_version_of_snippet_with_name(
215
    scope: Scope, name: str
216
) -> DbSnippetVersion:
217
    """Return the current version of the snippet with that name in that
218
    scope, or `None` if not found.
219
    """
220
    return DbSnippetVersion.query \
221
        .join(DbCurrentVersionAssociation) \
222
        .join(DbSnippet) \
223
            .filter(DbSnippet.scope_type == scope.type_) \
224
            .filter(DbSnippet.scope_name == scope.name) \
225
            .filter(DbSnippet.name == name) \
226
        .one_or_none()
227
228
229
def get_versions(snippet_id: SnippetID) -> Sequence[DbSnippetVersion]:
230
    """Return all versions of that snippet, sorted from most recent to
231
    oldest.
232
    """
233
    return DbSnippetVersion.query \
234
        .filter_by(snippet_id=snippet_id) \
235
        .latest_first() \
236
        .all()
237
238
239
def search_snippets(
240
    search_term: str, scope: Optional[Scope]
241
) -> List[DbSnippetVersion]:
242
    """Search in (the latest versions of) snippets."""
243
    q = DbSnippetVersion.query \
244
        .join(DbCurrentVersionAssociation) \
245
        .join(DbSnippet)
246
247
    if scope is not None:
248
        q = q \
249
            .filter(DbSnippet.scope_type == scope.type_) \
250
            .filter(DbSnippet.scope_name == scope.name)
251
252
    return q \
253
            .filter(
254
                db.or_(
255
                    DbSnippetVersion.title.contains(search_term),
256
                    DbSnippetVersion.head.contains(search_term),
257
                    DbSnippetVersion.body.contains(search_term),
258
                    DbSnippetVersion.image_url_path.contains(search_term),
259
                )
260
            ) \
261
        .all()
262
263
264
class SnippetNotFound(Exception):
265
266
    def __init__(self, scope: Scope, name: str) -> None:
267
        self.scope = scope
268
        self.name = name
269