Passed
Push — main ( 38d8f7...b77d07 )
by Jochen
04:19
created

byceps.blueprints.admin.snippet.views   A

Complexity

Total Complexity 38

Size/Duplication

Total Lines 665
Duplicated Lines 2.41 %

Test Coverage

Coverage 75.46%

Importance

Changes 0
Metric Value
eloc 414
dl 16
loc 665
ccs 206
cts 273
cp 0.7546
rs 9.36
c 0
b 0
f 0
wmc 38

25 Functions

Rating   Name   Duplication   Size   Complexity  
A history() 0 25 1
A create_fragment_form() 0 17 1
A _find_brand_for_scope() 0 5 2
A view_current_version() 0 9 1
A _get_sites_to_potentially_mount_to() 0 9 3
A _create_html_diff() 16 16 1
A index_mountpoints() 0 23 1
A update_fragment() 0 25 1
A create_fragment() 0 25 1
A create_document() 0 34 1
A compare_fragments() 0 25 2
A update_document_form() 0 21 1
A compare_documents() 0 33 2
A _find_site_for_scope() 0 5 2
A update_document() 0 33 1
A view_version() 0 39 2
A index_for_scope() 0 24 1
A delete_snippet() 0 29 2
A delete_mountpoint() 0 18 2
A create_document_form() 0 17 1
A create_mountpoint_form() 0 23 2
A _find_snippet_by_id() 0 7 2
A create_mountpoint() 0 30 2
A update_fragment_form() 0 21 1
A _find_version() 0 7 2

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
"""
2
byceps.blueprints.admin.snippet.views
3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2006-2021 Jochen Kupperschmidt
6
:License: Revised BSD (see `LICENSE` file for details)
7
"""
8
9 1
from __future__ import annotations
10 1
from typing import Optional
11 1
from flask import abort, g, request, url_for
12 1
from flask_babel import gettext
13
14 1
from ....services.brand import service as brand_service
15 1
from ....services.brand.transfer.models import Brand
16 1
from ....services.site import service as site_service
17 1
from ....services.site.transfer.models import Site, SiteID
18 1
from ....services.snippet.dbmodels.snippet import (
19
    Snippet as DbSnippet,
20
    SnippetVersion as DbSnippetVersion,
21
)
22 1
from ....services.snippet import mountpoint_service, service as snippet_service
23 1
from ....services.snippet.transfer.models import (
24
    Scope,
25
    SnippetID,
26
    SnippetVersionID,
27
)
28 1
from ....services.text_diff import service as text_diff_service
29 1
from ....services.user import service as user_service
30 1
from ....signals import snippet as snippet_signals
31 1
from ....util.authorization import register_permission_enum
32 1
from ....util.datetime.format import format_datetime_short
33 1
from ....util.framework.blueprint import create_blueprint
34 1
from ....util.framework.flash import flash_error, flash_success
35 1
from ....util.framework.templating import templated
36 1
from ....util.iterables import pairwise
37 1
from ....util.views import (
38
    permission_required,
39
    redirect_to,
40
    respond_no_content,
41
    respond_no_content_with_location,
42
)
43 1
from ....typing import BrandID
44
45 1
from ...site.snippet.templating import get_snippet_context
46
47 1
from .authorization import SnippetMountpointPermission, SnippetPermission
48 1
from .forms import (
49
    DocumentCreateForm,
50
    DocumentUpdateForm,
51
    FragmentCreateForm,
52
    FragmentUpdateForm,
53
    MountpointCreateForm,
54
)
55
56 1
blueprint = create_blueprint('snippet_admin', __name__)
57
58
59 1
register_permission_enum(SnippetMountpointPermission)
60 1
register_permission_enum(SnippetPermission)
61
62
63 1
@blueprint.get('/for_scope/<scope_type>/<scope_name>')
64 1
@permission_required(SnippetPermission.view)
65 1
@templated
66
def index_for_scope(scope_type, scope_name):
67
    """List snippets for that scope."""
68 1
    scope = Scope(scope_type, scope_name)
69
70 1
    snippets = snippet_service.get_snippets_for_scope_with_current_versions(
71
        scope
72
    )
73
74 1
    user_ids = {snippet.current_version.creator_id for snippet in snippets}
75 1
    users = user_service.find_users(user_ids, include_avatars=True)
76 1
    users_by_id = user_service.index_users_by_id(users)
77
78 1
    brand = _find_brand_for_scope(scope)
79 1
    site = _find_site_for_scope(scope)
80
81 1
    return {
82
        'scope': scope,
83
        'snippets': snippets,
84
        'users_by_id': users_by_id,
85
        'brand': brand,
86
        'site': site,
87
    }
88
89
90 1
@blueprint.get('/snippets/<uuid:snippet_id>/current_version')
91 1
@permission_required(SnippetPermission.view)
92
def view_current_version(snippet_id):
93
    """Show the current version of the snippet."""
94 1
    snippet = _find_snippet_by_id(snippet_id)
95
96 1
    version = snippet.current_version
97
98 1
    return view_version(version.id)
99
100
101 1
@blueprint.get('/versions/<uuid:snippet_version_id>')
102 1
@permission_required(SnippetPermission.view_history)
103 1
@templated
104
def view_version(snippet_version_id):
105
    """Show the snippet with the given id."""
106 1
    version = _find_version(snippet_version_id)
107
108 1
    snippet = version.snippet
109 1
    scope = snippet.scope
110 1
    creator = user_service.get_user(version.creator_id, include_avatar=True)
111 1
    is_current_version = version.id == snippet.current_version.id
112
113 1
    context = {
114
        'version': version,
115
        'scope': scope,
116
        'creator': creator,
117
        'brand': _find_brand_for_scope(scope),
118
        'site': _find_site_for_scope(scope),
119
        'is_current_version': is_current_version,
120
    }
121
122 1
    try:
123 1
        snippet_context = get_snippet_context(version)
124
125 1
        extra_context = {
126
            'snippet_title': snippet_context['page_title'],
127
            'snippet_head': snippet_context['head'],
128
            'snippet_body': snippet_context['body'],
129
            'error_occurred': False,
130
        }
131
    except Exception as e:
132
        extra_context = {
133
            'error_occurred': True,
134
            'error_message': str(e),
135
        }
136
137 1
    context.update(extra_context)
138
139 1
    return context
140
141
142 1
@blueprint.get('/snippets/<uuid:snippet_id>/history')
143 1
@permission_required(SnippetPermission.view_history)
144 1
@templated
145
def history(snippet_id):
146 1
    snippet = _find_snippet_by_id(snippet_id)
147
148 1
    scope = snippet.scope
149
150 1
    versions = snippet_service.get_versions(snippet.id)
151 1
    versions_pairwise = list(pairwise(versions + [None]))
152
153 1
    user_ids = {version.creator_id for version in versions}
154 1
    users = user_service.find_users(user_ids, include_avatars=True)
155 1
    users_by_id = user_service.index_users_by_id(users)
156
157 1
    brand = _find_brand_for_scope(scope)
158 1
    site = _find_site_for_scope(scope)
159
160 1
    return {
161
        'scope': scope,
162
        'snippet': snippet,
163
        'versions_pairwise': versions_pairwise,
164
        'users_by_id': users_by_id,
165
        'brand': brand,
166
        'site': site,
167
    }
168
169
170
# -------------------------------------------------------------------- #
171
# document
172
173
174 1
@blueprint.get('/for_scope/<scope_type>/<scope_name>/documents/create')
175 1
@permission_required(SnippetPermission.create)
176 1
@templated
177
def create_document_form(scope_type, scope_name):
178
    """Show form to create a document."""
179 1
    scope = Scope(scope_type, scope_name)
180
181 1
    form = DocumentCreateForm()
182
183 1
    brand = _find_brand_for_scope(scope)
184 1
    site = _find_site_for_scope(scope)
185
186 1
    return {
187
        'scope': scope,
188
        'form': form,
189
        'brand': brand,
190
        'site': site,
191
    }
192
193
194 1
@blueprint.post('/for_scope/<scope_type>/<scope_name>/documents')
195 1
@permission_required(SnippetPermission.create)
196
def create_document(scope_type, scope_name):
197
    """Create a document."""
198
    scope = Scope(scope_type, scope_name)
199
200
    form = DocumentCreateForm(request.form)
201
202
    name = form.name.data.strip().lower()
203
    creator = g.user
204
    title = form.title.data.strip()
205
    head = form.head.data.strip()
206
    body = form.body.data.strip()
207
    image_url_path = form.image_url_path.data.strip()
208
209
    version, event = snippet_service.create_document(
210
        scope,
211
        name,
212
        creator.id,
213
        title,
214
        body,
215
        head=head,
216
        image_url_path=image_url_path,
217
    )
218
219
    flash_success(
220
        gettext(
221
            'Document "%(name)s" has been created.', name=version.snippet.name
222
        )
223
    )
224
225
    snippet_signals.snippet_created.send(None, event=event)
226
227
    return redirect_to('.view_version', snippet_version_id=version.id)
228
229
230 1
@blueprint.get('/documents/<uuid:snippet_id>/update')
231 1
@permission_required(SnippetPermission.update)
232 1
@templated
233
def update_document_form(snippet_id):
234
    """Show form to update a document."""
235
    snippet = _find_snippet_by_id(snippet_id)
236
    current_version = snippet.current_version
237
238
    scope = snippet.scope
239
240
    form = DocumentUpdateForm(obj=current_version, name=snippet.name)
241
242
    brand = _find_brand_for_scope(scope)
243
    site = _find_site_for_scope(scope)
244
245
    return {
246
        'scope': scope,
247
        'form': form,
248
        'snippet': snippet,
249
        'brand': brand,
250
        'site': site,
251
    }
252
253
254 1
@blueprint.post('/documents/<uuid:snippet_id>')
255 1
@permission_required(SnippetPermission.update)
256
def update_document(snippet_id):
257
    """Update a document."""
258
    form = DocumentUpdateForm(request.form)
259
260
    snippet = _find_snippet_by_id(snippet_id)
261
262
    creator = g.user
263
    title = form.title.data.strip()
264
    head = form.head.data.strip()
265
    body = form.body.data.strip()
266
    image_url_path = form.image_url_path.data.strip()
267
268
    version, event = snippet_service.update_document(
269
        snippet.id,
270
        creator.id,
271
        title,
272
        body,
273
        head=head,
274
        image_url_path=image_url_path,
275
    )
276
277
    flash_success(
278
        gettext(
279
            'Document "%(name)s" has been updated.',
280
            name=version.snippet.name,
281
        )
282
    )
283
284
    snippet_signals.snippet_updated.send(None, event=event)
285
286
    return redirect_to('.view_version', snippet_version_id=version.id)
287
288
289 1
@blueprint.get(
290
    '/documents/<uuid:from_version_id>/compare_to/<uuid:to_version_id>'
291
)
292 1
@permission_required(SnippetPermission.view_history)
293 1
@templated
294
def compare_documents(from_version_id, to_version_id):
295
    """Show the difference between two document versions."""
296 1
    from_version = _find_version(from_version_id)
297 1
    to_version = _find_version(to_version_id)
298
299 1
    scope = from_version.snippet.scope
300
301 1
    if from_version.snippet_id != to_version.snippet_id:
302
        abort(400, 'The versions do not belong to the same snippet.')
303
304 1
    html_diff_title = _create_html_diff(from_version, to_version, 'title')
305 1
    html_diff_head = _create_html_diff(from_version, to_version, 'head')
306 1
    html_diff_body = _create_html_diff(from_version, to_version, 'body')
307 1
    html_diff_image_url_path = _create_html_diff(
308
        from_version, to_version, 'image_url_path'
309
    )
310
311 1
    brand = _find_brand_for_scope(scope)
312 1
    site = _find_site_for_scope(scope)
313
314 1
    return {
315
        'scope': scope,
316
        'diff_title': html_diff_title,
317
        'diff_head': html_diff_head,
318
        'diff_body': html_diff_body,
319
        'diff_image_url_path': html_diff_image_url_path,
320
        'brand': brand,
321
        'site': site,
322
    }
323
324
325
# -------------------------------------------------------------------- #
326
# fragment
327
328
329 1
@blueprint.get('/for_scope/<scope_type>/<scope_name>/fragments/create')
330 1
@permission_required(SnippetPermission.create)
331 1
@templated
332
def create_fragment_form(scope_type, scope_name):
333
    """Show form to create a fragment."""
334 1
    scope = Scope(scope_type, scope_name)
335
336 1
    form = FragmentCreateForm()
337
338 1
    brand = _find_brand_for_scope(scope)
339 1
    site = _find_site_for_scope(scope)
340
341 1
    return {
342
        'scope': scope,
343
        'form': form,
344
        'brand': brand,
345
        'site': site,
346
    }
347
348
349 1
@blueprint.post('/for_scope/<scope_type>/<scope_name>/fragments')
350 1
@permission_required(SnippetPermission.create)
351
def create_fragment(scope_type, scope_name):
352
    """Create a fragment."""
353
    scope = Scope(scope_type, scope_name)
354
355
    form = FragmentCreateForm(request.form)
356
357
    name = form.name.data.strip().lower()
358
    creator = g.user
359
    body = form.body.data.strip()
360
361
    version, event = snippet_service.create_fragment(
362
        scope, name, creator.id, body
363
    )
364
365
    flash_success(
366
        gettext(
367
            'Fragment "%(name)s" has been created.', name=version.snippet.name
368
        )
369
    )
370
371
    snippet_signals.snippet_created.send(None, event=event)
372
373
    return redirect_to('.view_version', snippet_version_id=version.id)
374
375
376 1
@blueprint.get('/fragments/<uuid:snippet_id>/update')
377 1
@permission_required(SnippetPermission.update)
378 1
@templated
379
def update_fragment_form(snippet_id):
380
    """Show form to update a fragment."""
381
    snippet = _find_snippet_by_id(snippet_id)
382
    current_version = snippet.current_version
383
384
    scope = snippet.scope
385
386
    form = FragmentUpdateForm(obj=current_version, name=snippet.name)
387
388
    brand = _find_brand_for_scope(scope)
389
    site = _find_site_for_scope(scope)
390
391
    return {
392
        'scope': scope,
393
        'form': form,
394
        'snippet': snippet,
395
        'brand': brand,
396
        'site': site,
397
    }
398
399
400 1
@blueprint.post('/fragments/<uuid:snippet_id>')
401 1
@permission_required(SnippetPermission.update)
402
def update_fragment(snippet_id):
403
    """Update a fragment."""
404
    form = FragmentUpdateForm(request.form)
405
406
    snippet = _find_snippet_by_id(snippet_id)
407
408
    creator = g.user
409
    body = form.body.data.strip()
410
411
    version, event = snippet_service.update_fragment(
412
        snippet.id, creator.id, body
413
    )
414
415
    flash_success(
416
        gettext(
417
            'Fragment "%(name)s" has been updated.',
418
            name=version.snippet.name,
419
        )
420
    )
421
422
    snippet_signals.snippet_updated.send(None, event=event)
423
424
    return redirect_to('.view_version', snippet_version_id=version.id)
425
426
427 1
@blueprint.get(
428
    '/fragments/<uuid:from_version_id>/compare_to/<uuid:to_version_id>'
429
)
430 1
@permission_required(SnippetPermission.view_history)
431 1
@templated
432
def compare_fragments(from_version_id, to_version_id):
433
    """Show the difference between two fragment versions."""
434 1
    from_version = _find_version(from_version_id)
435 1
    to_version = _find_version(to_version_id)
436
437 1
    scope = from_version.snippet.scope
438
439 1
    if from_version.snippet_id != to_version.snippet_id:
440
        abort(400, 'The versions do not belong to the same snippet.')
441
442 1
    html_diff_body = _create_html_diff(from_version, to_version, 'body')
443
444 1
    brand = _find_brand_for_scope(scope)
445 1
    site = _find_site_for_scope(scope)
446
447 1
    return {
448
        'scope': scope,
449
        'diff_body': html_diff_body,
450
        'brand': brand,
451
        'site': site,
452
    }
453
454
455
# -------------------------------------------------------------------- #
456
# delete
457
458
459 1
@blueprint.delete('/snippets/<uuid:snippet_id>')
460 1
@permission_required(SnippetPermission.delete)
461 1
@respond_no_content_with_location
462
def delete_snippet(snippet_id):
463
    """Delete a snippet."""
464 1
    snippet = _find_snippet_by_id(snippet_id)
465
466 1
    snippet_name = snippet.name
467 1
    scope = snippet.scope
468
469 1
    success, event = snippet_service.delete_snippet(
470
        snippet.id, initiator_id=g.user.id
471
    )
472
473 1
    if not success:
474
        flash_error(
475
            gettext(
476
                'Snippet "%(snippet_name)s" could not be deleted. Is it still mounted?',
477
                snippet_name=snippet_name,
478
            )
479
        )
480
        return url_for('.view_current_version', snippet_id=snippet.id)
481
482 1
    flash_success(
483
        gettext('Snippet "%(name)s" has been deleted.', name=snippet_name)
484
    )
485 1
    snippet_signals.snippet_deleted.send(None, event=event)
486 1
    return url_for(
487
        '.index_for_scope', scope_type=scope.type_, scope_name=scope.name
488
    )
489
490
491
# -------------------------------------------------------------------- #
492
# mountpoint
493
494
495 1
@blueprint.get('/mountpoints/<site_id>')
496 1
@permission_required(SnippetPermission.view)
497 1
@templated
498
def index_mountpoints(site_id):
499
    """List mountpoints for that site."""
500 1
    scope = Scope.for_site(site_id)
501
502 1
    mountpoints = mountpoint_service.get_mountpoints_for_site(site_id)
503
504 1
    snippet_ids = {mp.snippet_id for mp in mountpoints}
505 1
    snippets = snippet_service.get_snippets(snippet_ids)
506 1
    snippets_by_snippet_id = {snippet.id: snippet for snippet in snippets}
507
508 1
    mountpoints_and_snippets = [
509
        (mp, snippets_by_snippet_id[mp.snippet_id]) for mp in mountpoints
510
    ]
511
512 1
    site = _find_site_for_scope(scope)
513
514 1
    return {
515
        'scope': scope,
516
        'mountpoints_and_snippets': mountpoints_and_snippets,
517
        'site': site,
518
    }
519
520
521 1
@blueprint.get('/snippets/<uuid:snippet_id>/mountpoints/create')
522 1
@permission_required(SnippetMountpointPermission.create)
523 1
@templated
524 1
def create_mountpoint_form(snippet_id, *, erroneous_form=None):
525
    """Show form to create a mountpoint."""
526 1
    snippet = _find_snippet_by_id(snippet_id)
527
528 1
    scope = snippet.scope
529
530 1
    brand = _find_brand_for_scope(scope)
531 1
    site = _find_site_for_scope(scope)
532
533 1
    sites = _get_sites_to_potentially_mount_to(brand, site)
534
535 1
    form = erroneous_form if erroneous_form else MountpointCreateForm()
536 1
    form.set_site_id_choices(sites)
537
538 1
    return {
539
        'scope': scope,
540
        'snippet': snippet,
541
        'form': form,
542
        'brand': brand,
543
        'site': site,
544
    }
545
546
547 1
def _get_sites_to_potentially_mount_to(
548
    brand: Optional[Brand] = None, site: Optional[Site] = None
549
) -> set[Site]:
550 1
    if site is not None:
551
        return {site}
552 1
    elif brand is not None:
553
        return site_service.get_sites_for_brand(brand.id)
554
    else:
555 1
        return site_service.get_all_sites()
556
557
558 1
@blueprint.post('/snippets/<uuid:snippet_id>/mountpoints')
559 1
@permission_required(SnippetMountpointPermission.create)
560
def create_mountpoint(snippet_id):
561
    """Create a mountpoint."""
562 1
    snippet = _find_snippet_by_id(snippet_id)
563
564 1
    sites = site_service.get_all_sites()
565
566 1
    form = MountpointCreateForm(request.form)
567 1
    form.set_site_id_choices(sites)
568
569 1
    if not form.validate():
570
        return create_mountpoint_form(snippet.id, erroneous_form=form)
571
572 1
    site_id = form.site_id.data
573 1
    endpoint_suffix = form.endpoint_suffix.data.strip()
574 1
    url_path = form.url_path.data.strip()
575
576 1
    mountpoint = mountpoint_service.create_mountpoint(
577
        site_id, endpoint_suffix, url_path, snippet.id
578
    )
579
580 1
    flash_success(
581
        gettext(
582
            'Mountpoint for "%(url_path)s" has been created.',
583
            url_path=mountpoint.url_path,
584
        )
585
    )
586
587 1
    return redirect_to('.index_mountpoints', site_id=site_id)
588
589
590 1
@blueprint.delete('/mountpoints/<uuid:mountpoint_id>')
591 1
@permission_required(SnippetMountpointPermission.delete)
592 1
@respond_no_content
593
def delete_mountpoint(mountpoint_id):
594
    """Delete a mountpoint."""
595 1
    mountpoint = mountpoint_service.find_mountpoint(mountpoint_id)
596
597 1
    if mountpoint is None:
598
        abort(404)
599
600 1
    url_path = mountpoint.url_path
601
602 1
    mountpoint_service.delete_mountpoint(mountpoint.id)
603
604 1
    flash_success(
605
        gettext(
606
            'Mountpoint for "%(url_path)s" has been deleted.',
607
            url_path=url_path,
608
        )
609
    )
610
611
612
# -------------------------------------------------------------------- #
613
# helpers
614
615
616 1
def _find_snippet_by_id(snippet_id: SnippetID) -> DbSnippet:
617 1
    snippet = snippet_service.find_snippet(snippet_id)
618
619 1
    if snippet is None:
620
        abort(404)
621
622 1
    return snippet
623
624
625 1
def _find_version(version_id: SnippetVersionID) -> DbSnippetVersion:
626 1
    version = snippet_service.find_snippet_version(version_id)
627
628 1
    if version is None:
629
        abort(404)
630
631 1
    return version
632
633
634 1 View Code Duplication
def _create_html_diff(
1 ignored issue
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
635
    from_version: DbSnippetVersion,
636
    to_version: DbSnippetVersion,
637
    attribute_name: str,
638
) -> Optional[str]:
639
    """Create an HTML diff between the named attribute's value of each
640
    of the two versions.
641
    """
642 1
    from_description = format_datetime_short(from_version.created_at)
643 1
    to_description = format_datetime_short(to_version.created_at)
644
645 1
    from_text = getattr(from_version, attribute_name)
646 1
    to_text = getattr(to_version, attribute_name)
647
648 1
    return text_diff_service.create_html_diff(
649
        from_text, to_text, from_description, to_description
650
    )
651
652
653 1
def _find_brand_for_scope(scope: Scope) -> Optional[Brand]:
654 1
    if scope.type_ != 'brand':
655 1
        return None
656
657
    return brand_service.find_brand(BrandID(scope.name))
658
659
660 1
def _find_site_for_scope(scope: Scope) -> Optional[Site]:
661 1
    if scope.type_ != 'site':
662 1
        return None
663
664
    return site_service.find_site(SiteID(scope.name))
665