Passed
Pull Request — develop (#124)
by inkhey
01:44
created

ContentApi.get_one()   B

Complexity

Conditions 6

Size

Total Lines 36
Code Lines 22

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 22
dl 0
loc 36
rs 8.4186
c 0
b 0
f 0
cc 6
nop 6
1
# -*- coding: utf-8 -*-
2
import datetime
3
import os
4
import re
5
import traceback
6
import typing
7
from contextlib import contextmanager
8
9
import sqlalchemy
10
import transaction
11
from depot.io.utils import FileIntent
12
from depot.manager import DepotManager
13
from preview_generator.exception import UnavailablePreviewType
14
from preview_generator.exception import UnsupportedMimeType
15
from preview_generator.manager import PreviewManager
16
from sqlalchemy import desc
17
from sqlalchemy import func
18
from sqlalchemy import or_
19
from sqlalchemy.orm import Query
20
from sqlalchemy.orm import aliased
21
from sqlalchemy.orm import joinedload
22
from sqlalchemy.orm.attributes import QueryableAttribute
23
from sqlalchemy.orm.attributes import get_history
24
from sqlalchemy.orm.exc import NoResultFound
25
from sqlalchemy.orm.session import Session
26
from sqlalchemy.sql.elements import and_
27
28
from tracim_backend.app_models.contents import content_status_list
29
from tracim_backend.app_models.contents import content_type_list
30
from tracim_backend.app_models.contents import FOLDER_TYPE
31
from tracim_backend.app_models.contents import ContentStatus
32
from tracim_backend.app_models.contents import ContentType
33
from tracim_backend.app_models.contents import GlobalStatus
34
from tracim_backend.exceptions import ContentInNotEditableState
35
from tracim_backend.exceptions import ContentFilenameAlreadyUsedInFolder
36
from tracim_backend.exceptions import ContentNotFound
37
from tracim_backend.exceptions import ContentTypeNotExist
38
from tracim_backend.exceptions import EmptyCommentContentNotAllowed
39
from tracim_backend.exceptions import EmptyLabelNotAllowed
40
from tracim_backend.exceptions import PageOfPreviewNotFound
41
from tracim_backend.exceptions import PreviewDimNotAllowed
42
from tracim_backend.exceptions import RevisionDoesNotMatchThisContent
43
from tracim_backend.exceptions import SameValueError
44
from tracim_backend.exceptions import TracimUnavailablePreviewType
45
from tracim_backend.exceptions import UnallowedSubContent
46
from tracim_backend.exceptions import UnavailablePreview
47
from tracim_backend.exceptions import WorkspacesDoNotMatch
48
from tracim_backend.lib.core.notifications import NotifierFactory
49
from tracim_backend.lib.utils.logger import logger
50
from tracim_backend.lib.utils.translation import DEFAULT_FALLBACK_LANG
51
from tracim_backend.lib.utils.translation import Translator
52
from tracim_backend.lib.utils.utils import cmp_to_key
53
from tracim_backend.lib.utils.utils import current_date_for_filename
54
from tracim_backend.lib.utils.utils import preview_manager_page_format
55
from tracim_backend.models.auth import User
56
from tracim_backend.models.context_models import ContentInContext
57
from tracim_backend.models.context_models import PreviewAllowedDim
58
from tracim_backend.models.context_models import RevisionInContext
59
from tracim_backend.models.data import ActionDescription
60
from tracim_backend.models.data import Content
61
from tracim_backend.models.data import ContentRevisionRO
62
from tracim_backend.models.data import NodeTreeItem
63
from tracim_backend.models.data import RevisionReadStatus
64
from tracim_backend.models.data import UserRoleInWorkspace
65
from tracim_backend.models.data import Workspace
66
from tracim_backend.models.revision_protection import new_revision
67
68
__author__ = 'damien'
69
70
71
# TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
72
def compare_content_for_sorting_by_type_and_name(
73
        content1: Content,
74
        content2: Content
75
) -> int:
76
    """
77
    :param content1:
78
    :param content2:
79
    :return:    1 if content1 > content2
80
                -1 if content1 < content2
81
                0 if content1 = content2
82
    """
83
84
    if content1.type == content2.type:
85
        if content1.get_label().lower()>content2.get_label().lower():
86
            return 1
87
        elif content1.get_label().lower()<content2.get_label().lower():
88
            return -1
89
        return 0
90
    else:
91
        # TODO - D.A. - 2014-12-02 - Manage Content Types Dynamically
92
        content_type_order = [
93
            content_type_list.Folder.slug,
94
            content_type_list.Page.slug,
95
            content_type_list.Thread.slug,
96
            content_type_list.File.slug,
97
        ]
98
99
        content_1_type_index = content_type_order.index(content1.type)
100
        content_2_type_index = content_type_order.index(content2.type)
101
        result = content_1_type_index - content_2_type_index
102
103
        if result < 0:
104
            return -1
105
        elif result > 0:
106
            return 1
107
        else:
108
            return 0
109
110
# TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
111
def compare_tree_items_for_sorting_by_type_and_name(
112
        item1: NodeTreeItem,
113
        item2: NodeTreeItem
114
) -> int:
115
    return compare_content_for_sorting_by_type_and_name(item1.node, item2.node)
116
117
118
class ContentApi(object):
119
120
    SEARCH_SEPARATORS = ',| '
121
    SEARCH_DEFAULT_RESULT_NB = 50
122
123
    # DISPLAYABLE_CONTENTS = (
124
    #     content_type_list.Folder.slug,
125
    #     content_type_list.File.slug,
126
    #     content_type_list.Comment.slug,
127
    #     content_type_list.Thread.slug,
128
    #     content_type_list.Page.slug,
129
    #     content_type_list.Page.slugLegacy,
130
    #     ContentType.MarkdownPage,
131
    # )
132
133
    def __init__(
134
            self,
135
            session: Session,
136
            current_user: typing.Optional[User],
137
            config,
138
            show_archived: bool = False,
139
            show_deleted: bool = False,
140
            show_temporary: bool = False,
141
            show_active: bool = True,
142
            all_content_in_treeview: bool = True,
143
            force_show_all_types: bool = False,
144
            disable_user_workspaces_filter: bool = False,
145
    ) -> None:
146
        self._session = session
147
        self._user = current_user
148
        self._config = config
149
        self._user_id = current_user.user_id if current_user else None
150
        self._show_archived = show_archived
151
        self._show_deleted = show_deleted
152
        self._show_temporary = show_temporary
153
        self._show_active = show_active
154
        self._show_all_type_of_contents_in_treeview = all_content_in_treeview
155
        self._force_show_all_types = force_show_all_types
156
        self._disable_user_workspaces_filter = disable_user_workspaces_filter
157
        self.preview_manager = PreviewManager(self._config.PREVIEW_CACHE_DIR, create_folder=True)  # nopep8
158
        default_lang = None
159
        if self._user:
160
            default_lang = self._user.lang
161
        if not default_lang:
162
            default_lang = DEFAULT_FALLBACK_LANG
163
        self.translator = Translator(app_config=self._config, default_lang=default_lang)  # nopep8
164
165
    @contextmanager
166
    def show(
167
            self,
168
            show_archived: bool=False,
169
            show_deleted: bool=False,
170
            show_temporary: bool=False,
171
    ) -> typing.Generator['ContentApi', None, None]:
172
        """
173
        Use this method as context manager to update show_archived,
174
        show_deleted and show_temporary properties during context.
175
        :param show_archived: show archived contents
176
        :param show_deleted:  show deleted contents
177
        :param show_temporary:  show temporary contents
178
        """
179
        previous_show_archived = self._show_archived
180
        previous_show_deleted = self._show_deleted
181
        previous_show_temporary = self._show_temporary
182
183
        try:
184
            self._show_archived = show_archived
185
            self._show_deleted = show_deleted
186
            self._show_temporary = show_temporary
187
            yield self
188
        finally:
189
            self._show_archived = previous_show_archived
190
            self._show_deleted = previous_show_deleted
191
            self._show_temporary = previous_show_temporary
192
193
    def get_content_in_context(self, content: Content) -> ContentInContext:
194
        return ContentInContext(content, self._session, self._config, self._user)  # nopep8
195
196
    def get_revision_in_context(self, revision: ContentRevisionRO) -> RevisionInContext:  # nopep8
197
        # TODO - G.M - 2018-06-173 - create revision in context object
198
        return RevisionInContext(revision, self._session, self._config, self._user) # nopep8
199
    
200
    def _get_revision_join(self) -> sqlalchemy.sql.elements.BooleanClauseList:
201
        """
202
        Return the Content/ContentRevision query join condition
203
        :return: Content/ContentRevision query join condition
204
        """
205
        return and_(Content.id == ContentRevisionRO.content_id,
206
                    ContentRevisionRO.revision_id == self._session.query(
207
                        ContentRevisionRO.revision_id)
208
                    .filter(ContentRevisionRO.content_id == Content.id)
209
                    .order_by(ContentRevisionRO.revision_id.desc())
210
                    .limit(1)
211
                    .correlate(Content))
212
213
    def get_canonical_query(self) -> Query:
214
        """
215
        Return the Content/ContentRevision base query who join these table on the last revision.
216
        :return: Content/ContentRevision Query
217
        """
218
        return self._session.query(Content)\
219
            .join(ContentRevisionRO, self._get_revision_join())
220
221
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
222
    @classmethod
223
    def sort_tree_items(
224
        cls,
225
        content_list: typing.List[NodeTreeItem],
226
    )-> typing.List[NodeTreeItem]:
227
        news = []
228
        for item in content_list:
229
            news.append(item)
230
231
        content_list.sort(key=cmp_to_key(
232
            compare_tree_items_for_sorting_by_type_and_name,
233
        ))
234
235
        return content_list
236
237
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
238
    @classmethod
239
    def sort_content(
240
        cls,
241
        content_list: typing.List[Content],
242
    ) -> typing.List[Content]:
243
        content_list.sort(key=cmp_to_key(compare_content_for_sorting_by_type_and_name))
244
        return content_list
245
246
    def __real_base_query(
247
        self,
248
        workspace: Workspace = None,
249
    ) -> Query:
250
        result = self.get_canonical_query()
251
252
        # Exclude non displayable types
253
        if not self._force_show_all_types:
254
            result = result.filter(Content.type.in_(content_type_list.query_allowed_types_slugs()))
255
256
        if workspace:
257
            result = result.filter(Content.workspace_id == workspace.workspace_id)
258
259
        # Security layer: if user provided, filter
260
        # with user workspaces privileges
261
        if self._user and not self._disable_user_workspaces_filter:
262
            user = self._session.query(User).get(self._user_id)
263
            # Filter according to user workspaces
264
            workspace_ids = [r.workspace_id for r in user.roles \
265
                             if r.role>=UserRoleInWorkspace.READER]
266
            result = result.filter(or_(
267
                Content.workspace_id.in_(workspace_ids),
268
                # And allow access to non workspace document when he is owner
269
                and_(
270
                    Content.workspace_id == None,
271
                    Content.owner_id == self._user_id,
272
                )
273
            ))
274
275
        return result
276
277
    def _base_query(self, workspace: Workspace=None) -> Query:
278
        result = self.__real_base_query(workspace)
279
280
        if not self._show_active:
281
            result = result.filter(or_(
282
                Content.is_deleted==True,
283
                Content.is_archived==True,
284
            ))
285
        if not self._show_deleted:
286
            result = result.filter(Content.is_deleted==False)
287
288
        if not self._show_archived:
289
            result = result.filter(Content.is_archived==False)
290
291
        if not self._show_temporary:
292
            result = result.filter(Content.is_temporary==False)
293
294
        return result
295
296
    def __revisions_real_base_query(
297
        self,
298
        workspace: Workspace=None,
299
    ) -> Query:
300
        result = self._session.query(ContentRevisionRO)
301
302
        # Exclude non displayable types
303
        if not self._force_show_all_types:
304
            result = result.filter(Content.type.in_(self.DISPLAYABLE_CONTENTS))
305
306
        if workspace:
307
            result = result.filter(ContentRevisionRO.workspace_id==workspace.workspace_id)
308
309
        if self._user:
310
            user = self._session.query(User).get(self._user_id)
311
            # Filter according to user workspaces
312
            workspace_ids = [r.workspace_id for r in user.roles \
313
                             if r.role>=UserRoleInWorkspace.READER]
314
            result = result.filter(ContentRevisionRO.workspace_id.in_(workspace_ids))
315
316
        return result
317
318
    def _revisions_base_query(
319
        self,
320
        workspace: Workspace=None,
321
    ) -> Query:
322
        result = self.__revisions_real_base_query(workspace)
323
324
        if not self._show_deleted:
325
            result = result.filter(ContentRevisionRO.is_deleted==False)
326
327
        if not self._show_archived:
328
            result = result.filter(ContentRevisionRO.is_archived==False)
329
330
        if not self._show_temporary:
331
            result = result.filter(Content.is_temporary==False)
332
333
        return result
334
335
    def _hard_filtered_base_query(
336
        self,
337
        workspace: Workspace=None,
338
    ) -> Query:
339
        """
340
        If set to True, then filterign on is_deleted and is_archived will also
341
        filter parent properties. This is required for search() function which
342
        also search in comments (for example) which may be 'not deleted' while
343
        the associated content is deleted
344
345
        :param hard_filtering:
346
        :return:
347
        """
348
        result = self.__real_base_query(workspace)
349
350
        if not self._show_deleted:
351
            parent = aliased(Content)
352
            result = result.join(parent, Content.parent).\
353
                filter(Content.is_deleted==False).\
354
                filter(parent.is_deleted==False)
355
356
        if not self._show_archived:
357
            parent = aliased(Content)
358
            result = result.join(parent, Content.parent).\
359
                filter(Content.is_archived==False).\
360
                filter(parent.is_archived==False)
361
362
        if not self._show_temporary:
363
            parent = aliased(Content)
364
            result = result.join(parent, Content.parent). \
365
                filter(Content.is_temporary == False). \
366
                filter(parent.is_temporary == False)
367
368
        return result
369
370
    def get_base_query(
371
        self,
372
        workspace: Workspace,
373
    ) -> Query:
374
        return self._base_query(workspace)
375
376
    # TODO - G.M - 2018-07-17 - [Cleanup] Drop this method if unneeded
377
    # def get_child_folders(self, parent: Content=None, workspace: Workspace=None, filter_by_allowed_content_types: list=[], removed_item_ids: list=[], allowed_node_types=None) -> typing.List[Content]:
378
    #     """
379
    #     This method returns child items (folders or items) for left bar treeview.
380
    # 
381
    #     :param parent:
382
    #     :param workspace:
383
    #     :param filter_by_allowed_content_types:
384
    #     :param removed_item_ids:
385
    #     :param allowed_node_types: This parameter allow to hide folders for which the given type of content is not allowed.
386
    #            For example, if you want to move a Page from a folder to another, you should show only folders that accept pages
387
    #     :return:
388
    #     """
389
    #     filter_by_allowed_content_types = filter_by_allowed_content_types or []  # FDV
390
    #     removed_item_ids = removed_item_ids or []  # FDV
391
    # 
392
    #     if not allowed_node_types:
393
    #         allowed_node_types = [content_type_list.Folder.slug]
394
    #     elif allowed_node_types==content_type_list.Any_SLUG:
395
    #         allowed_node_types = ContentType.all()
396
    # 
397
    #     parent_id = parent.content_id if parent else None
398
    #     folders = self._base_query(workspace).\
399
    #         filter(Content.parent_id==parent_id).\
400
    #         filter(Content.type.in_(allowed_node_types)).\
401
    #         filter(Content.content_id.notin_(removed_item_ids)).\
402
    #         all()
403
    # 
404
    #     if not filter_by_allowed_content_types or \
405
    #                     len(filter_by_allowed_content_types)<=0:
406
    #         # Standard case for the left treeview: we want to show all contents
407
    #         # in the left treeview... so we still filter because for example
408
    #         # comments must not appear in the treeview
409
    #         return [folder for folder in folders \
410
    #                 if folder.type in ContentType.allowed_types_for_folding()]
411
    # 
412
    #     # Now this is a case of Folders only (used for moving content)
413
    #     # When moving a content, you must get only folders that allow to be filled
414
    #     # with the type of content you want to move
415
    #     result = []
416
    #     for folder in folders:
417
    #         for allowed_content_type in filter_by_allowed_content_types:
418
    # 
419
    #             is_folder = folder.type == content_type_list.Folder.slug
420
    #             content_type__allowed = folder.properties['allowed_content'][allowed_content_type] == True
421
    # 
422
    #             if is_folder and content_type__allowed:
423
    #                 result.append(folder)
424
    #                 break
425
    # 
426
    #     return result
427
428
    def _is_filename_available(
429
            self,
430
            filename: str,
431
            workspace: Workspace,
432
            parent: Content = None,
433
            exclude_content_id: int = None,
434
    ) -> bool:
435
        """
436
        Check if content label is free
437
        :param filename: content label
438
        :param workspace: workspace of the content
439
        :param parent: parent of the content
440
        :param exclude_content_id: exclude a specific content_id (useful
441
        to verify  other content for content update)
442
        :return: True if content label is available
443
        """
444
        # INFO - G.M - 2018-09-04 - Method should not be used by special content
445
        # with empty filename like comment.
446
        assert filename
447
        assert workspace
448
        label, file_extension = os.path.splitext(filename)
449
        query = self.get_base_query(workspace)
450
451
        if parent:
452
            query = query.filter(Content.parent_id == parent.content_id)
453
        else:
454
            query = query.filter(Content.parent_id == None)
455
456
        if exclude_content_id:
457
            query = query.filter(Content.content_id != exclude_content_id)
458
        query = query.filter(Content.workspace_id == workspace.workspace_id)
459
460
461
        nb_content_with_the_filename = query.filter(
462
            Content.file_name == filename
463
        ).count()
464
        if nb_content_with_the_filename == 0:
465
            return True
466
        elif nb_content_with_the_filename == 1:
467
            return False
468
        else:
469
            critical_error_text = 'Something is wrong in the database ! '\
470
                                  'Content filename should be unique ' \
471
                                  'in a same folder in database' \
472
                                  'but you have {nb} content with ' \
473
                                  'filename {filename} ' \
474
                                  'in workspace {workspace_id}'
475
476
            critical_error_text = critical_error_text.format(
477
                nb=nb_content_with_the_filename,
478
                filename=filename,
479
                workspace_id=workspace.workspace_id,
480
            )
481
            if parent:
482
                critical_error_text = '{text} and parent as content {parent_id}'.format(  # nopep8
483
                    text=critical_error_text,
484
                    parent_id=parent.parent_id
485
                )
486
            logger.critical(self, critical_error_text)
487
            return False
488
489
    def _prepare_filename(
490
            self,
491
            label: str,
492
            file_extension: str,
493
    ) -> str:
494
        """
495
        generate correct file_name from label and file_extension
496
        :return: valid
497
        """
498
        # TODO - G.M - 2018-10-11 - Find a way to
499
        # Refactor this in order to use same method
500
        # in both contentLib and .file_name of content
501
        return '{label}{file_extension}'.format(
502
            label=label,
503
            file_extension=file_extension
504
        )
505
506
    def _is_filename_available_or_raise(
507
        self,
508
        filename: str,
509
        workspace: Workspace,
510
        parent: Content = None,
511
        exclude_content_id: int = None,
512
    ) -> bool:
513
        """
514
        Same as _is_filename_available but raise exception instead of
515
        returning boolean if content filename is already used
516
        """
517
        if self._is_filename_available(
518
            filename,
519
            workspace,
520
            parent,
521
            exclude_content_id
522
        ):
523
            return True
524
        # INFO - G.M - 2018-10-11 - prepare exception message
525
        exception_message = 'A Content already exist with the same filename ' \
526
            '{filename}  in workspace {workspace_id}'
527
        exception_message = exception_message.format(
528
            filename=filename,
529
            workspace_id=workspace.workspace_id,
530
        )
531
        if parent:
532
            exception_message = '{text} and parent as content {parent_id}'.format(
533
                text=exception_message,
534
                parent_id=parent.parent_id
535
            )
536
        raise ContentFilenameAlreadyUsedInFolder(exception_message)
537
538
    def create(self, content_type_slug: str, workspace: Workspace, parent: Content=None, label: str = '', filename: str = '', do_save=False, is_temporary: bool=False, do_notify=True) -> Content:
539
        # TODO - G.M - 2018-07-16 - raise Exception instead of assert
540
        assert content_type_slug != content_type_list.Any_SLUG
541
        assert not (label and filename)
542
543
        if content_type_slug == FOLDER_TYPE and not label:
544
            label = self.generate_folder_label(workspace, parent)
545
546
        # TODO BS 2018-08-13: Despite that workspace is required, create_comment
547
        # can call here with None. Must update create_comment tu require the
548
        # workspace.
549
        if not workspace:
550
            workspace = parent.workspace
551
552
        content_type = content_type_list.get_one_by_slug(content_type_slug)
553
        if parent and parent.properties and 'allowed_content' in parent.properties:
554
            if content_type.slug not in parent.properties['allowed_content'] or not parent.properties['allowed_content'][content_type.slug]:
555
                raise UnallowedSubContent(' SubContent of type {subcontent_type}  not allowed in content {content_id}'.format(  # nopep8
556
                    subcontent_type=content_type.slug,
557
                    content_id=parent.content_id,
558
                ))
559
        if not workspace and parent:
560
            workspace = parent.workspace
561
562
        if workspace:
563
            if content_type.slug not in workspace.get_allowed_content_types():
564
                raise UnallowedSubContent(
565
                    ' SubContent of type {subcontent_type}  not allowed in workspace {content_id}'.format(  # nopep8
566
                        subcontent_type=content_type.slug,
567
                        content_id=workspace.workspace_id,
568
                    )
569
                )
570
        content = Content()
571
        if label:
572
            file_extension = ''
573
            if content_type.file_extension:
574
                file_extension = content_type.file_extension
575
            filename = self._prepare_filename(label, file_extension)
576
            self._is_filename_available_or_raise(
577
                filename,
578
                workspace,
579
                parent,
580
            )
581
            # TODO - G.M - 2018-10-15 - Set file extension and label
582
            # explicitly instead of filename in order to have correct
583
            # label/file-extension separation.
584
            content.label = label
585
            content.file_extension = file_extension
586
        elif filename:
587
            self._is_filename_available_or_raise(
588
                filename,
589
                workspace,
590
                parent
591
            )
592
            # INFO - G.M - 2018-07-04 - File_name setting automatically
593
            # set label and file_extension
594
            content.file_name = filename
595
        else:
596
            if content_type_slug == content_type_list.Comment.slug:
597
                # INFO - G.M - 2018-07-16 - Default label for comments is
598
                # empty string.
599
                content.label = ''
600
            else:
601
                raise EmptyLabelNotAllowed(
602
                    'Content of type {} should have a valid label'.format(content_type_slug)  # nopep8
603
                )
604
605
        content.owner = self._user
606
        content.parent = parent
607
608
        content.workspace = workspace
609
        content.type = content_type.slug
610
        content.is_temporary = is_temporary
611
        content.revision_type = ActionDescription.CREATION
612
613
        if do_save:
614
            self._session.add(content)
615
            self.save(content, ActionDescription.CREATION, do_notify=do_notify)
616
        return content
617
618
    def create_comment(self, workspace: Workspace=None, parent: Content=None, content:str ='', do_save=False, do_notify=True) -> Content:
619
        # TODO: check parent allowed_type and workspace allowed_ type
620
        assert parent and parent.type != FOLDER_TYPE
621
        if not self.is_editable(parent):
622
            raise ContentInNotEditableState(
623
                "Can't create comment on content, you need to change his status or state (deleted/archived) before any change."
624
            )
625
        if not content:
626
            raise EmptyCommentContentNotAllowed()
627
628
        item = self.create(
629
            content_type_slug=content_type_list.Comment.slug,
630
            workspace=workspace,
631
            parent=parent,
632
            do_notify=False,
633
            do_save=False,
634
            label='',
635
        )
636
        item.description = content
637
        item.revision_type = ActionDescription.COMMENT
638
639
        if do_save:
640
            self.save(item, ActionDescription.COMMENT, do_notify=do_notify)
641
        return item
642
643
    def get_one_from_revision(self, content_id: int, content_type: str, workspace: Workspace=None, revision_id=None) -> Content:
644
        """
645
        This method is a hack to convert a node revision item into a node
646
        :param content_id:
647
        :param content_type:
648
        :param workspace:
649
        :param revision_id:
650
        :return:
651
        """
652
653
        content = self.get_one(content_id, content_type, workspace)
654
        revision = self._session.query(ContentRevisionRO).filter(ContentRevisionRO.revision_id==revision_id).one()
655
656
        if revision.content_id==content.content_id:
657
            content.revision_to_serialize = revision.revision_id
658
        else:
659
            raise ValueError('Revision not found for given content')
660
661
        return content
662
663
    def get_one(
664
            self,
665
            content_id: int,
666
            content_type: str,
667
            workspace: Workspace=None,
668
            parent: Content=None,
669
            ignore_content_state_filter: bool=False
670
    ) -> typing.Optional[Content]:
671
672
        if not content_id:
673
            return None
674
675
        if ignore_content_state_filter:
676
            base_request = self.__real_base_query(workspace)
677
        else:
678
            base_request = self._base_query(workspace)
679
680
681
        base_request = base_request.filter(Content.content_id==content_id)
682
683
        if content_type!=content_type_list.Any_SLUG:
684
            base_request = base_request.filter(Content.type==content_type)
685
686
        if parent:
687
            base_request = base_request.filter(Content.parent_id==parent.content_id)  # nopep8
688
689
        try:
690
            content = base_request.one()
691
        except NoResultFound as exc:
692
            # TODO - G.M - 2018-07-16 - Add better support for all different
693
            # error case who can happened here
694
            # like content doesn't exist, wrong parent, wrong content_type, wrong workspace,
695
            # wrong access to this workspace, wrong base filter according
696
            # to content_status.
697
            raise ContentNotFound('Content "{}" not found in database'.format(content_id)) from exc  # nopep8
698
        return content
699
700
    def get_one_revision(self, revision_id: int = None, content: Content= None) -> ContentRevisionRO:  # nopep8
701
        """
702
        This method allow us to get directly any revision with its id
703
        :param revision_id: The content's revision's id that we want to return
704
        :param content: The content related to the revision, if None do not
705
        check if revision is related to this content.
706
        :return: An item Content linked with the correct revision
707
        """
708
        assert revision_id is not None# DYN_REMOVE
709
710
        revision = self._session.query(ContentRevisionRO).filter(ContentRevisionRO.revision_id == revision_id).one()
711
        if content and revision.content_id != content.content_id:
712
            raise RevisionDoesNotMatchThisContent(
713
                'revision {revision_id} is not a revision of content {content_id}'.format(  # nopep8
714
                    revision_id=revision.revision_id,
715
                    content_id=content.content_id,
716
                    )
717
            )
718
        return revision
719
720
    # INFO - A.P - 2017-07-03 - python file object getter
721
    # in case of we cook a version of preview manager that allows a pythonic
722
    # access to files
723
    # def get_one_revision_file(self, revision_id: int = None):
724
    #     """
725
    #     This function allows us to directly get a Python file object from its
726
    #     revision identifier.
727
    #     :param revision_id: The revision id of the file we want to return
728
    #     :return: The corresponding Python file object
729
    #     """
730
    #     revision = self.get_one_revision(revision_id)
731
    #     return DepotManager.get().get(revision.depot_file)
732
733
    def get_one_revision_filepath(self, revision_id: int = None) -> str:
734
        """
735
        This method allows us to directly get a file path from its revision
736
        identifier.
737
        :param revision_id: The revision id of the filepath we want to return
738
        :return: The corresponding filepath
739
        """
740
        revision = self.get_one_revision(revision_id)
741
        depot = DepotManager.get()
742
        depot_stored_file = depot.get(revision.depot_file)  # type: StoredFile
743
        depot_file_path = depot_stored_file._file_path  # type: str
744
        return depot_file_path
745
746
    # TODO - G.M - 2018-09-04 - [Cleanup] Is this method already needed ?
747
    def get_one_by_label_and_parent(
748
            self,
749
            content_label: str,
750
            content_parent: Content=None,
751
    ) -> Content:
752
        """
753
        This method let us request the database to obtain a Content with its name and parent
754
        :param content_label: Either the content's label or the content's filename if the label is None
755
        :param content_parent: The parent's content
756
        :param workspace: The workspace's content
757
        :return The corresponding Content
758
        """
759
        workspace = content_parent.workspace if content_parent else None
760
        query = self._base_query(workspace)
761
        parent_id = content_parent.content_id if content_parent else None
762
        query = query.filter(Content.parent_id == parent_id)
763
764
        file_name, file_extension = os.path.splitext(content_label)
765
766
        # TODO - G.M - 2018-09-04 - If this method is needed, it should be
767
        # rewritten in order to avoid content_type hardcoded code there
768
        return query.filter(
769
            or_(
770
                and_(
771
                    Content.type == content_type_list.File.slug,
772
                    Content.label == file_name,
773
                    Content.file_extension == file_extension,
774
                ),
775
                and_(
776
                    Content.type == content_type_list.Thread.slug,
777
                    Content.label == file_name,
778
                ),
779
                and_(
780
                    Content.type == content_type_list.Page.slug,
781
                    Content.label == file_name,
782
                ),
783
                and_(
784
                    Content.type == content_type_list.Folder.slug,
785
                    Content.label == content_label,
786
                ),
787
            )
788
        ).one()
789
790
    # TODO - G.M - 2018-09-04 - [Cleanup] Is this method already needed ?
791
    def get_one_by_label_and_parent_labels(
792
            self,
793
            content_label: str,
794
            workspace: Workspace,
795
            content_parent_labels: [str]=None,
796
    ):
797
        """
798
        Return content with it's label, workspace and parents labels (optional)
799
        :param content_label: label of content (label or file_name)
800
        :param workspace: workspace containing all of this
801
        :param content_parent_labels: Ordered list of labels representing path
802
            of folder (without workspace label).
803
        E.g.: ['foo', 'bar'] for complete path /Workspace1/foo/bar folder
804
        :return: Found Content
805
        """
806
        query = self._base_query(workspace)
807
        parent_folder = None
808
809
        # Grab content parent folder if parent path given
810
        if content_parent_labels:
811
            parent_folder = self.get_folder_with_workspace_path_labels(
812
                content_parent_labels,
813
                workspace,
814
            )
815
816
        # Build query for found content by label
817
        content_query = self.filter_query_for_content_label_as_path(
818
            query=query,
819
            content_label_as_file=content_label,
820
        )
821
822
        # Modify query to apply parent folder filter if any
823
        if parent_folder:
824
            content_query = content_query.filter(
825
                Content.parent_id == parent_folder.content_id,
826
            )
827
        else:
828
            content_query = content_query.filter(
829
                Content.parent_id == None,
830
            )
831
832
        # Filter with workspace
833
        content_query = content_query.filter(
834
            Content.workspace_id == workspace.workspace_id,
835
        )
836
837
        # Return the content
838
        return content_query\
839
            .order_by(
840
                Content.revision_id.desc(),
841
            )\
842
            .one()
843
844
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
845
    def get_folder_with_workspace_path_labels(
846
            self,
847
            path_labels: [str],
848
            workspace: Workspace,
849
    ) -> Content:
850
        """
851
        Return a Content folder for given relative path.
852
        TODO BS 20161124: Not safe if web interface allow folder duplicate names
853
        :param path_labels: List of labels representing path of folder
854
        (without workspace label).
855
        E.g.: ['foo', 'bar'] for complete path /Workspace1/foo/bar folder
856
        :param workspace: workspace of folders
857
        :return: Content folder
858
        """
859
        query = self._base_query(workspace)
860
        folder = None
861
862
        for label in path_labels:
863
            # Filter query on label
864
            folder_query = query \
865
                .filter(
866
                Content.type == content_type_list.Folder.slug,
867
                Content.label == label,
868
                Content.workspace_id == workspace.workspace_id,
869
                )
870
871
            # Search into parent folder (if already deep)
872
            if folder:
873
                folder_query = folder_query\
874
                    .filter(
875
                        Content.parent_id == folder.content_id,
876
                    )
877
            else:
878
                folder_query = folder_query \
879
                    .filter(Content.parent_id == None)
880
881
            # Get thirst corresponding folder
882
            folder = folder_query \
883
                .order_by(Content.revision_id.desc()) \
884
                .one()
885
886
        return folder
887
888
    # TODO - G.M - 2018-09-04 - [Cleanup] Is this method already needed ?
889
    def filter_query_for_content_label_as_path(
890
            self,
891
            query: Query,
892
            content_label_as_file: str,
893
            is_case_sensitive: bool = False,
894
    ) -> Query:
895
        """
896
        Apply normalised filters to found Content corresponding as given label.
897
        :param query: query to modify
898
        :param content_label_as_file: label in this
899
        FILE version, use Content.file_name.
900
        :param is_case_sensitive: Take care about case or not
901
        :return: modified query
902
        """
903
        file_name, file_extension = os.path.splitext(content_label_as_file)
904
905
        label_filter = Content.label == content_label_as_file
906
        file_name_filter = Content.label == file_name
907
        file_extension_filter = Content.file_extension == file_extension
908
909
        if not is_case_sensitive:
910
            label_filter = func.lower(Content.label) == \
911
                           func.lower(content_label_as_file)
912
            file_name_filter = func.lower(Content.label) == \
913
                               func.lower(file_name)
914
            file_extension_filter = func.lower(Content.file_extension) == \
915
                                    func.lower(file_extension)
916
917
        # TODO - G.M - 2018-09-04 - If this method is needed, it should be
918
        # rewritten in order to avoid content_type hardcoded code there
919
        return query.filter(or_(
920
            and_(
921
                Content.type == content_type_list.File.slug,
922
                file_name_filter,
923
                file_extension_filter,
924
            ),
925
            and_(
926
                Content.type == content_type_list.Thread.slug,
927
                file_name_filter,
928
                file_extension_filter,
929
            ),
930
            and_(
931
                Content.type == content_type_list.Page.slug,
932
                file_name_filter,
933
                file_extension_filter,
934
            ),
935
            and_(
936
                Content.type == content_type_list.Folder.slug,
937
                label_filter,
938
            ),
939
        ))
940
941
    def get_pdf_preview_path(
942
        self,
943
        content_id: int,
944
        revision_id: int,
945
        page_number: int,
946
        file_extension: str,
947
    ) -> str:
948
        """
949
        Get pdf preview of revision of content
950
        :param content_id: id of content
951
        :param revision_id: id of content revision
952
        :param page_number: page number of the preview, useful for multipage
953
        content
954
        :param file_extension: file extension of the file
955
        :return: preview_path as string
956
        """
957
        file_path = self.get_one_revision_filepath(revision_id)
958
        try:
959
            page_number = preview_manager_page_format(page_number)
960
            if page_number >= self.preview_manager.get_page_nb(file_path, file_ext=file_extension):  # nopep8
961
                raise PageOfPreviewNotFound(
962
                    'page_number {page_number} of content {content_id} does not exist'.format(  # nopep8
963
                        page_number=page_number,
964
                        content_id=content_id
965
                    ),
966
                )
967
            pdf_preview_path = self.preview_manager.get_pdf_preview(
968
                file_path,
969
                page=page_number,
970
                file_ext=file_extension,
971
            )
972
        except PageOfPreviewNotFound as exc:
973
            # passthrough as this exception is already supported with
974
            # specific error code.
975
            raise exc
976
        except UnavailablePreviewType as exc:
977
            raise TracimUnavailablePreviewType() from exc
978
        except UnsupportedMimeType as exc:
979
            raise UnavailablePreview(
980
                'No preview available for content {}, revision {}'.format(content_id, revision_id)  # nopep8
981
            ) from exc
982
        except Exception as exc:
983
            logger.warning(
984
                self,
985
                "Unknown Preview_Generator Exception Occured : {}".format(str(exc))
986
            )
987
            logger.warning(self, traceback.format_exc())
988
            raise UnavailablePreview(
989
                'No preview available for content {}, revision {}'.format(content_id, revision_id)  # nopep8
990
            ) from exc
991
        return pdf_preview_path
992
993
    def get_full_pdf_preview_path(self, revision_id: int, file_extension: str) -> str:
994
        """
995
        Get full(multiple page) pdf preview of revision of content
996
        :param revision_id: id of revision
997
                :param file_extension: file extension of the file
998
        :return: path of the full pdf preview of this revision
999
        """
1000
        file_path = self.get_one_revision_filepath(revision_id)
1001
        try:
1002
            pdf_preview_path = self.preview_manager.get_pdf_preview(file_path, file_ext=file_extension)  # nopep8
1003
        except UnavailablePreviewType as exc:
1004
            raise TracimUnavailablePreviewType() from exc
1005
        except UnsupportedMimeType as exc:
1006
            raise UnavailablePreview(
1007
                'No preview available for revision {}'.format(revision_id)
1008
            ) from exc
1009
        except Exception as exc:
1010
            logger.warning(
1011
                self,
1012
                "Unknown Preview_Generator Exception Occured : {}".format(str(exc))
1013
            )
1014
            logger.warning(self, traceback.format_exc())
1015
            raise UnavailablePreview(
1016
                'No preview available for revision {}'.format(revision_id)
1017
            ) from exc
1018
        return pdf_preview_path
1019
1020
    def get_jpg_preview_allowed_dim(self) -> PreviewAllowedDim:
1021
        """
1022
        Get jpg preview allowed dimensions and strict bool param.
1023
        """
1024
        return PreviewAllowedDim(
1025
            self._config.PREVIEW_JPG_RESTRICTED_DIMS,
1026
            self._config.PREVIEW_JPG_ALLOWED_DIMS,
1027
        )
1028
1029
    def get_jpg_preview_path(
1030
        self,
1031
        content_id: int,
1032
        revision_id: int,
1033
        page_number: int,
1034
        file_extension: str,
1035
        width: int = None,
1036
        height: int = None,
1037
    ) -> str:
1038
        """
1039
        Get jpg preview of revision of content
1040
        :param content_id: id of content
1041
        :param revision_id: id of content revision
1042
        :param page_number: page number of the preview, useful for multipage
1043
        content
1044
        :param file_extension: file extension of the file
1045
        :param width: width in pixel
1046
        :param height: height in pixel
1047
        :return: preview_path as string
1048
        """
1049
        file_path = self.get_one_revision_filepath(revision_id)
1050
        try:
1051
            page_number = preview_manager_page_format(page_number)
1052
            if page_number >= self.preview_manager.get_page_nb(file_path, file_ext=file_extension):  # nopep8
1053
                raise PageOfPreviewNotFound(
1054
                    'page {page_number} of revision {revision_id} of content {content_id} does not exist'.format(  # nopep8
1055
                        page_number=page_number,
1056
                        revision_id=revision_id,
1057
                        content_id=content_id,
1058
                    ),
1059
                )
1060
            if not width and not height:
1061
                width = self._config.PREVIEW_JPG_ALLOWED_DIMS[0].width
1062
                height = self._config.PREVIEW_JPG_ALLOWED_DIMS[0].height
1063
1064
            allowed_dim = False
1065
            for preview_dim in self._config.PREVIEW_JPG_ALLOWED_DIMS:
1066
                if width == preview_dim.width and height == preview_dim.height:
1067
                    allowed_dim = True
1068
                    break
1069
1070
            if not allowed_dim and self._config.PREVIEW_JPG_RESTRICTED_DIMS:
1071
                raise PreviewDimNotAllowed(
1072
                    'Size {width}x{height} is not allowed for jpeg preview'.format(
1073
                        width=width,
1074
                        height=height,
1075
                    )
1076
                )
1077
            jpg_preview_path = self.preview_manager.get_jpeg_preview(
1078
                file_path,
1079
                page=page_number,
1080
                width=width,
1081
                height=height,
1082
                file_ext=file_extension,
1083
            )
1084
        except (PreviewDimNotAllowed, PageOfPreviewNotFound) as exc:
1085
            # passthrough as those exceptions are already supported with
1086
            # specific error code.
1087
            raise exc
1088
        except UnsupportedMimeType as exc:
1089
            raise UnavailablePreview(
1090
                'No preview available for content {}, revision {}'.format(content_id, revision_id)  # nopep8
1091
            ) from exc
1092
        except Exception as exc:
1093
            logger.warning(
1094
                self,
1095
                "Unknown Preview_Generator Exception Occured : {}".format(str(exc))
1096
            )
1097
            logger.warning(self, traceback.format_exc())
1098
            raise UnavailablePreview(
1099
                'No preview available for content {}, revision {}'.format(content_id, revision_id)  # nopep8
1100
            ) from exc
1101
        return jpg_preview_path
1102
1103
    def _get_all_query(
1104
        self,
1105
        parent_ids: typing.List[int] = None,
1106
        content_type_slug: str = content_type_list.Any_SLUG,
1107
        workspace: Workspace = None,
1108
        label: str = None,
1109
        order_by_properties: typing.Optional[typing.List[typing.Union[str, QueryableAttribute]]] = None,  # nopep8
1110
        complete_path_to_id: int = None,
1111
    ) -> Query:
1112
        """
1113
        Extended filter for better "get all data" query
1114
        :param parent_ids: filter by parent_ids
1115
        :param content_type_slug: filter by content_type slug
1116
        :param workspace: filter by workspace
1117
        :param complete_path_to_id: add all parent(root included) of content_id
1118
        given there to parent_ids filter.
1119
        :param order_by_properties: filter by properties can be both string of
1120
        attribute or attribute of Model object from sqlalchemy(preferred way,
1121
        QueryableAttribute object)
1122
        :return: Query object
1123
        """
1124
        order_by_properties = order_by_properties or []  # FDV
1125
        assert not parent_ids or isinstance(parent_ids, list)
1126
        assert content_type_slug is not None
1127
        assert not complete_path_to_id or isinstance(complete_path_to_id, int)
1128
        resultset = self._base_query(workspace)
1129
1130
        # INFO - G.M - 2018-11-12 - Get list of all ancestror
1131
        #  of content, workspace root included
1132
        if complete_path_to_id:
1133
            content = self.get_one(
1134
                complete_path_to_id,
1135
                content_type_list.Any_SLUG,
1136
                ignore_content_state_filter=True
1137
            )
1138
            if content.parent_id:
1139
                content = content.parent
1140
                while content.parent_id:
1141
                    parent_ids.append(content.content_id)
1142
                    content = content.parent
1143
                parent_ids.append(content.content_id)
1144
            # TODO - G.M - 2018-11-12 - add workspace root to
1145
            # parent_ids list when complete_path_to_id is set
1146
            parent_ids.append(0)
1147
1148
        if content_type_slug != content_type_list.Any_SLUG:
1149
            # INFO - G.M - 2018-07-05 - convert with
1150
            #  content type object to support legacy slug
1151
            content_type_object = content_type_list.get_one_by_slug(content_type_slug)
1152
            all_slug_alias = [content_type_object.slug]
1153
            if content_type_object.slug_alias:
1154
                all_slug_alias.extend(content_type_object.slug_alias)
1155
            resultset = resultset.filter(Content.type.in_(all_slug_alias))
1156
1157
        if parent_ids is False:
1158
            resultset = resultset.filter(Content.parent_id == None)
1159
1160
        if parent_ids:
1161
            # TODO - G.M - 2018-11-09 - Adapt list in order to deal with root
1162
            # case properly
1163
            allowed_parent_ids = []
1164
            allow_root = False
1165
            for parent_id in parent_ids:
1166
                if parent_id == 0:
1167
                    allow_root = True
1168
                else:
1169
                    allowed_parent_ids.append(parent_id)
1170
            if allow_root:
1171
                resultset = resultset.filter(or_(Content.parent_id.in_(allowed_parent_ids), Content.parent_id == None))  # nopep8
1172
            else:
1173
                resultset = resultset.filter(Content.parent_id.in_(allowed_parent_ids))  # nopep8
1174
        if label:
1175
            resultset = resultset.filter(Content.label.ilike('%{}%'.format(label)))  # nopep8
1176
1177
        for _property in order_by_properties:
1178
            resultset = resultset.order_by(_property)
1179
1180
        return resultset
1181
1182
    def get_all(
1183
            self,
1184
            parent_ids: typing.List[int]=None,
1185
            content_type: str=content_type_list.Any_SLUG,
1186
            workspace: Workspace=None,
1187
            label: str=None,
1188
            order_by_properties: typing.Optional[typing.List[typing.Union[str, QueryableAttribute]]] = None,  # nopep8
1189
            complete_path_to_id: int = None,
1190
    ) -> typing.List[Content]:
1191
        """
1192
        Return all content using some filters
1193
        :param parent_ids: filter by parent_id
1194
        :param complete_path_to_id: filter by path of content_id
1195
        (add all parent, root included to parent_ids filter)
1196
        :param content_type: filter by content_type slug
1197
        :param workspace: filter by workspace
1198
        :param order_by_properties: filter by properties can be both string of
1199
        attribute or attribute of Model object from sqlalchemy(preferred way,
1200
        QueryableAttribute object)
1201
        :return: List of contents
1202
        """
1203
        order_by_properties = order_by_properties or []  # FDV
1204
        return self._get_all_query(parent_ids, content_type, workspace, label, order_by_properties, complete_path_to_id).all()
1205
1206
    # TODO - G.M - 2018-07-17 - [Cleanup] Drop this method if unneeded
1207
    # def get_children(self, parent_id: int, content_types: list, workspace: Workspace=None) -> typing.List[Content]:
1208
    #     """
1209
    #     Return parent_id childs of given content_types
1210
    #     :param parent_id: parent id
1211
    #     :param content_types: list of types
1212
    #     :param workspace: workspace filter
1213
    #     :return: list of content
1214
    #     """
1215
    #     resultset = self._base_query(workspace)
1216
    #     resultset = resultset.filter(Content.type.in_(content_types))
1217
    #
1218
    #     if parent_id:
1219
    #         resultset = resultset.filter(Content.parent_id==parent_id)
1220
    #     if parent_id is False:
1221
    #         resultset = resultset.filter(Content.parent_id == None)
1222
    #
1223
    #     return resultset.all()
1224
1225
    # TODO - G.M - 2018-07-17 - [Cleanup] Drop this method if unneeded
1226
    # TODO find an other name to filter on is_deleted / is_archived
1227
    def get_all_with_filter(self, parent_id: int=None, content_type: str=content_type_list.Any_SLUG, workspace: Workspace=None) -> typing.List[Content]:
1228
        assert parent_id is None or isinstance(parent_id, int) # DYN_REMOVE
1229
        assert content_type is not None# DYN_REMOVE
1230
        assert isinstance(content_type, str) # DYN_REMOVE
1231
1232
        resultset = self._base_query(workspace)
1233
1234
        if content_type != content_type_list.Any_SLUG:
1235
            resultset = resultset.filter(Content.type==content_type)
1236
1237
        resultset = resultset.filter(Content.is_deleted == self._show_deleted)
1238
        resultset = resultset.filter(Content.is_archived == self._show_archived)
1239
        resultset = resultset.filter(Content.is_temporary == self._show_temporary)
1240
1241
        resultset = resultset.filter(Content.parent_id==parent_id)
1242
1243
        return resultset.all()
1244
1245
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1246
    def get_all_without_exception(self, content_type: str, workspace: Workspace=None) -> typing.List[Content]:
1247
        assert content_type is not None# DYN_REMOVE
1248
1249
        resultset = self._base_query(workspace)
1250
1251
        if content_type != content_type_list.Any_SLUG:
1252
            resultset = resultset.filter(Content.type==content_type)
1253
1254
        return resultset.all()
1255
1256
    def get_last_unread(self, parent_id: int, content_type: str,
1257
                        workspace: Workspace=None, limit=10) -> typing.List[Content]:
1258
        assert parent_id is None or isinstance(parent_id, int) # DYN_REMOVE
1259
        assert content_type is not None# DYN_REMOVE
1260
        assert isinstance(content_type, str) # DYN_REMOVE
1261
1262
        read_revision_ids = self._session.query(RevisionReadStatus.revision_id) \
1263
            .filter(RevisionReadStatus.user_id==self._user_id)
1264
1265
        not_read_revisions = self._revisions_base_query(workspace) \
1266
            .filter(~ContentRevisionRO.revision_id.in_(read_revision_ids)) \
1267
            .filter(ContentRevisionRO.workspace_id == Workspace.workspace_id) \
1268
            .filter(Workspace.is_deleted.is_(False)) \
1269
            .subquery()
1270
1271
    # TODO - G.M - 2018-07-17 - [Cleanup] Drop this method if unneeded
1272
    # def get_all_without_exception(self, content_type: str, workspace: Workspace=None) -> typing.List[Content]:
1273
    #     assert content_type is not None# DYN_REMOVE
1274
    #
1275
    #     resultset = self._base_query(workspace)
1276
    #
1277
    #     if content_type != content_type_list.Any_SLUG:
1278
    #         resultset = resultset.filter(Content.type==content_type)
1279
    #
1280
    #     return resultset.all()
1281
1282
    def get_last_active(
1283
            self,
1284
            workspace: Workspace=None,
1285
            limit: typing.Optional[int]=None,
1286
            before_content: typing.Optional[Content]= None,
1287
            content_ids: typing.Optional[typing.List[int]] = None,
1288
    ) -> typing.List[Content]:
1289
        """
1290
        get contents list sorted by last update
1291
        (last modification of content itself or one of this comment)
1292
        :param workspace: Workspace to check
1293
        :param limit: maximum number of elements to return
1294
        :param before_content: last_active content are only those updated
1295
         before this content given.
1296
        :param content_ids: restrict selection to some content ids and
1297
        related Comments
1298
        :return: list of content
1299
        """
1300
1301
        resultset = self._get_all_query(
1302
            workspace=workspace,
1303
        )
1304
        if content_ids:
1305
            resultset = resultset.filter(
1306
                or_(
1307
                    Content.content_id.in_(content_ids),
1308
                    and_(
1309
                        Content.parent_id.in_(content_ids),
1310
                        Content.type == content_type_list.Comment.slug
1311
                    )
1312
                )
1313
            )
1314
1315
        resultset = resultset.order_by(desc(ContentRevisionRO.updated), desc(ContentRevisionRO.revision_id), desc(ContentRevisionRO.content_id))
1316
1317
        active_contents = []
1318
        too_recent_content = []
1319
        before_content_find = False
1320
        for content in resultset:
1321
            related_active_content = None
1322
            if content_type_list.Comment.slug == content.type:
1323
                related_active_content = content.parent
1324
            else:
1325
                related_active_content = content
1326
1327
            # INFO - G.M - 2018-08-10 - re-apply general filters here to avoid
1328
            # issue with comments
1329
            if not self._show_deleted and related_active_content.is_deleted:
1330
                continue
1331
            if not self._show_archived and related_active_content.is_archived:
1332
                continue
1333
1334
            if related_active_content not in active_contents and related_active_content not in too_recent_content:  # nopep8
1335
1336
                if not before_content or before_content_find:
1337
                    active_contents.append(related_active_content)
1338
                else:
1339
                    too_recent_content.append(related_active_content)
1340
1341
                if before_content and related_active_content == before_content:
1342
                    before_content_find = True
1343
1344
            if limit and len(active_contents) >= limit:
1345
                break
1346
1347
        return active_contents
1348
1349
    # TODO - G.M - 2018-07-19 - Find a way to update this method to something
1350
    # usable and efficient for tracim v2 to get content with read/unread status
1351
    # instead of relying on has_new_information_for()
1352
    # def get_last_unread(self, parent_id: typing.Optional[int], content_type: str,
1353
    #                     workspace: Workspace=None, limit=10) -> typing.List[Content]:
1354
    #     assert parent_id is None or isinstance(parent_id, int) # DYN_REMOVE
1355
    #     assert content_type is not None# DYN_REMOVE
1356
    #     assert isinstance(content_type, str) # DYN_REMOVE
1357
    #
1358
    #     read_revision_ids = self._session.query(RevisionReadStatus.revision_id) \
1359
    #         .filter(RevisionReadStatus.user_id==self._user_id)
1360
    #
1361
    #     not_read_revisions = self._revisions_base_query(workspace) \
1362
    #         .filter(~ContentRevisionRO.revision_id.in_(read_revision_ids)) \
1363
    #         .filter(ContentRevisionRO.workspace_id == Workspace.workspace_id) \
1364
    #         .filter(Workspace.is_deleted.is_(False)) \
1365
    #         .subquery()
1366
    #
1367
    #     not_read_content_ids_query = self._session.query(
1368
    #         distinct(not_read_revisions.c.content_id)
1369
    #     )
1370
    #     not_read_content_ids = list(map(
1371
    #         itemgetter(0),
1372
    #         not_read_content_ids_query,
1373
    #     ))
1374
    #
1375
    #     not_read_contents = self._base_query(workspace) \
1376
    #         .filter(Content.content_id.in_(not_read_content_ids)) \
1377
    #         .order_by(desc(Content.updated))
1378
    #
1379
    #     if content_type != content_type_list.Any_SLUG:
1380
    #         not_read_contents = not_read_contents.filter(
1381
    #             Content.type==content_type)
1382
    #     else:
1383
    #         not_read_contents = not_read_contents.filter(
1384
    #             Content.type!=content_type_list.Folder.slug)
1385
    #
1386
    #     if parent_id:
1387
    #         not_read_contents = not_read_contents.filter(
1388
    #             Content.parent_id==parent_id)
1389
    #
1390
    #     result = []
1391
    #     for item in not_read_contents:
1392
    #         new_item = None
1393
    #         if content_type_list.Comment.slug == item.type:
1394
    #             new_item = item.parent
1395
    #         else:
1396
    #             new_item = item
1397
    #
1398
    #         # INFO - D.A. - 2015-05-20
1399
    #         # We do not want to show only one item if the last 10 items are
1400
    #         # comments about one thread for example
1401
    #         if new_item not in result:
1402
    #             result.append(new_item)
1403
    #
1404
    #         if len(result) >= limit:
1405
    #             break
1406
    #
1407
    #     return result
1408
1409
    def _set_allowed_content(self, content: Content, allowed_content_dict: dict) -> None:  # nopep8
1410
        """
1411
        :param content: the given content instance
1412
        :param allowed_content_dict: must be something like this:
1413
            dict(
1414
                folder = True
1415
                thread = True,
1416
                file = False,
1417
                page = True
1418
            )
1419
        :return: nothing
1420
        """
1421
        properties = content.properties.copy()
1422
        properties['allowed_content'] = allowed_content_dict
1423
        content.properties = properties
1424
1425
    def set_allowed_content(self, content: Content, allowed_content_type_slug_list: typing.List[str]) -> None:  # nopep8
1426
        """
1427
        :param content: the given content instance
1428
        :param allowed_content_type_slug_list: list of content_type_slug to
1429
        accept as subcontent.
1430
        :return: nothing
1431
        """
1432
        allowed_content_dict = {}
1433
        for allowed_content_type_slug in allowed_content_type_slug_list:
1434
            if allowed_content_type_slug not in content_type_list.endpoint_allowed_types_slug():
1435
                raise ContentTypeNotExist('Content_type {} does not exist'.format(allowed_content_type_slug))  # nopep8
1436
            allowed_content_dict[allowed_content_type_slug] = True
1437
1438
        self._set_allowed_content(content, allowed_content_dict)
1439
1440
    def restore_content_default_allowed_content(self, content: Content) -> None:
1441
        """
1442
        Return to default allowed_content_types
1443
        :param content: the given content instance
1444
        :return: nothing
1445
        """
1446
        if content._properties and 'allowed_content' in content._properties:
1447
            properties = content.properties.copy()
1448
            del properties['allowed_content']
1449
            content.properties = properties
1450
1451
    def set_status(self, content: Content, new_status: str):
1452
        if new_status in content_status_list.get_all_slugs_values():
1453
            content.status = new_status
1454
            content.revision_type = ActionDescription.STATUS_UPDATE
1455
        else:
1456
            raise ValueError('The given value {} is not allowed'.format(new_status))
1457
1458
    def move(self,
1459
             item: Content,
1460
             new_parent: Content,
1461
             must_stay_in_same_workspace: bool=True,
1462
             new_workspace: Workspace=None,
1463
    ):
1464
        if must_stay_in_same_workspace:
1465
            if new_parent and new_parent.workspace_id != item.workspace_id:
1466
                raise ValueError('the item should stay in the same workspace')
1467
1468
        item.parent = new_parent
1469
        if new_workspace:
1470
            item.workspace = new_workspace
1471
            if new_parent and \
1472
                    new_parent.workspace_id != new_workspace.workspace_id:
1473
                raise WorkspacesDoNotMatch(
1474
                    'new parent workspace and new workspace should be the same.'
1475
                )
1476
        else:
1477
            if new_parent:
1478
                item.workspace = new_parent.workspace
1479
1480
        self._is_filename_available_or_raise(
1481
            item.file_name,
1482
            item.workspace,
1483
            item.parent,
1484
            exclude_content_id=item.content_id
1485
        )
1486
        item.revision_type = ActionDescription.MOVE
1487
1488
    def copy(
1489
        self,
1490
        item: Content,
1491
        new_parent: Content=None,
1492
        new_label: str=None,
1493
        do_save: bool=True,
1494
        do_notify: bool=True,
1495
    ) -> Content:
1496
        """
1497
        Copy nearly all content, revision included. Children not included, see
1498
        "copy_children" for this.
1499
        :param item: Item to copy
1500
        :param new_parent: new parent of the new copied item
1501
        :param new_label: new label of the new copied item
1502
        :param do_notify: notify copy or not
1503
        :return: Newly copied item
1504
        """
1505
        if (not new_parent and not new_label) or (new_parent == item.parent and new_label == item.label):  # nopep8
1506
            # TODO - G.M - 08-03-2018 - Use something else than value error
1507
            raise ValueError("You can't copy file into itself")
1508
        if new_parent:
1509
            workspace = new_parent.workspace
1510
            parent = new_parent
1511
        else:
1512
            workspace = item.workspace
1513
            parent = item.parent
1514
        label = new_label or item.label
1515
        filename = self._prepare_filename(label, item.file_extension)
1516
        self._is_filename_available_or_raise(filename, workspace, parent)
1517
        content = item.copy(parent)
1518
        # INFO - GM - 15-03-2018 - add "copy" revision
1519
        with new_revision(
1520
            session=self._session,
1521
            tm=transaction.manager,
1522
            content=content,
1523
            force_create_new_revision=True
1524
        ) as rev:
1525
            rev.parent = parent
1526
            rev.workspace = workspace
1527
            rev.file_name = filename
1528
            rev.revision_type = ActionDescription.COPY
1529
            rev.properties['origin'] = {
1530
                'content': item.id,
1531
                'revision': item.last_revision.revision_id,
1532
            }
1533
        if do_save:
1534
            self.save(content, ActionDescription.COPY, do_notify=do_notify)
1535
        return content
1536
1537
    def copy_children(self, origin_content: Content, new_content: Content):
1538
        for child in origin_content.children:
1539
            self.copy(child, new_content)
1540
1541
    def move_recursively(self, item: Content,
1542
                         new_parent: Content, new_workspace: Workspace):
1543
        self.move(item, new_parent, False, new_workspace)
1544
        self.save(item, do_notify=False)
1545
1546
        for child in item.children:
1547
            with new_revision(
1548
                session=self._session,
1549
                tm=transaction.manager,
1550
                content=child
1551
            ):
1552
                self.move_recursively(child, item, new_workspace)
1553
        return
1554
1555
    def is_editable(self, item: Content) -> bool:
1556
        return not item.is_readonly \
1557
               and item.is_active \
1558
               and item.get_status().is_editable()
1559
1560
    def update_content(self, item: Content, new_label: str, new_content: str=None) -> Content:
1561
        if not self.is_editable(item):
1562
            raise ContentInNotEditableState("Can't update not editable file, you need to change his status or state (deleted/archived) before any change.")  # nopep8
1563
        if item.label == new_label and item.description == new_content:
1564
            # TODO - G.M - 20-03-2018 - Fix internatization for webdav access.
1565
            # Internatization disabled in libcontent for now.
1566
            raise SameValueError('The content did not changed')
1567
        if not new_label:
1568
            raise EmptyLabelNotAllowed()
1569
1570
        label = new_label or item.label
1571
        filename = self._prepare_filename(label, item.file_extension)
1572
        self._is_filename_available_or_raise(
1573
            filename,
1574
            item.workspace,
1575
            item.parent,
1576
            exclude_content_id=item.content_id
1577
        )
1578
1579
        item.owner = self._user
1580
        item.label = new_label
1581
        item.description = new_content if new_content else item.description # TODO: convert urls into links
1582
        item.revision_type = ActionDescription.EDITION
1583
        return item
1584
1585
    def update_file_data(self, item: Content, new_filename: str, new_mimetype: str, new_content: bytes) -> Content:
1586
        if not self.is_editable(item):
1587
            raise ContentInNotEditableState("Can't update not editable file, you need to change his status or state (deleted/archived) before any change.")  # nopep8
1588
        # FIXME - G.M - 2018-09-25 - Repair and do a better same content check,
1589
        # as pyramid behaviour use buffered object
1590
        # new_content == item.depot_file.file.read() case cannot happened using
1591
        # whenever new_content.read() == item.depot_file.file.read().
1592
        # as this behaviour can create struggle with big file, simple solution
1593
        # using read can be used everytime.
1594
        # if new_mimetype == item.file_mimetype and \
1595
        #         new_content == item.depot_file.file.read():
1596
        #     raise SameValueError('The content did not changed')
1597
        item.owner = self._user
1598
        self._is_filename_available_or_raise(
1599
            new_filename,
1600
            item.workspace,
1601
            item.parent,
1602
            exclude_content_id=item.content_id
1603
        )
1604
        item.file_name = new_filename
1605
        item.file_mimetype = new_mimetype
1606
        item.depot_file = FileIntent(
1607
            new_content,
1608
            new_filename,
1609
            new_mimetype,
1610
        )
1611
        item.revision_type = ActionDescription.REVISION
1612
        return item
1613
1614
    def archive(self, content: Content):
1615
        content.owner = self._user
1616
        content.is_archived = True
1617
        # TODO - G.M - 12-03-2018 - Inspect possible label conflict problem
1618
        # INFO - G.M - 12-03-2018 - Set label name to avoid trouble when
1619
        # un-archiving file.
1620
        label = '{label}-{action}-{date}'.format(
1621
            label=content.label,
1622
            action='archived',
1623
            date=current_date_for_filename()
1624
        )
1625
        filename = self._prepare_filename(label, content.file_extension)
1626
        self._is_filename_available_or_raise(
1627
            filename,
1628
            content.workspace,
1629
            content.parent,
1630
            exclude_content_id=content.content_id
1631
        )
1632
        content.file_name = filename
1633
        content.revision_type = ActionDescription.ARCHIVING
1634
1635
    def unarchive(self, content: Content):
1636
        content.owner = self._user
1637
        content.is_archived = False
1638
        content.revision_type = ActionDescription.UNARCHIVING
1639
1640
    def delete(self, content: Content):
1641
        content.owner = self._user
1642
        content.is_deleted = True
1643
        # TODO - G.M - 12-03-2018 - Inspect possible label conflict problem
1644
        # INFO - G.M - 12-03-2018 - Set label name to avoid trouble when
1645
        # un-deleting file.
1646
        label = '{label}-{action}-{date}'.format(
1647
            label=content.label,
1648
            action='deleted',
1649
            date=current_date_for_filename()
1650
        )
1651
        filename = self._prepare_filename(label, content.file_extension)
1652
        self._is_filename_available_or_raise(
1653
            filename,
1654
            content.workspace,
1655
            content.parent,
1656
            exclude_content_id=content.content_id
1657
        )
1658
        content.file_name = filename
1659
        content.revision_type = ActionDescription.DELETION
1660
1661
    def undelete(self, content: Content):
1662
        content.owner = self._user
1663
        content.is_deleted = False
1664
        content.revision_type = ActionDescription.UNDELETION
1665
1666
    def get_preview_page_nb(self, revision_id: int, file_extension: str) -> typing.Optional[int]:  # nopep8
1667
        file_path = self.get_one_revision_filepath(revision_id)
1668
        try:
1669
            nb_pages = self.preview_manager.get_page_nb(
1670
                file_path,
1671
                file_ext=file_extension
1672
            )
1673
        except UnsupportedMimeType:
1674
            return None
1675
        except Exception as e:
1676
            logger.warning(
1677
                self,
1678
                "Unknown Preview_Generator Exception Occured : {}".format(str(e))
1679
            )
1680
            logger.warning(self, traceback.format_exc())
1681
            return None
1682
        return nb_pages
1683
1684 View Code Duplication
    def has_pdf_preview(self, revision_id: int, file_extension: str) -> bool:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1685
        file_path = self.get_one_revision_filepath(revision_id)
1686
        try:
1687
            return self.preview_manager.has_pdf_preview(
1688
                file_path,
1689
                file_ext=file_extension
1690
            )
1691
        except UnsupportedMimeType:
1692
            return False
1693
        except Exception as e:
1694
            logger.warning(
1695
                self,
1696
                "Unknown Preview_Generator Exception Occured : {}".format(str(e))
1697
            )
1698
            logger.warning(self, traceback.format_exc())
1699
            return False
1700
1701 View Code Duplication
    def has_jpeg_preview(self, revision_id: int, file_extension: str) -> bool:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1702
        file_path = self.get_one_revision_filepath(revision_id)
1703
        try:
1704
            return self.preview_manager.has_jpeg_preview(
1705
                file_path,
1706
                file_ext=file_extension
1707
            )
1708
        except UnsupportedMimeType:
1709
            return False
1710
        except Exception as e:
1711
            logger.warning(
1712
                self,
1713
                "Unknown Preview_Generator Exception Occured : {}".format(str(e))
1714
            )
1715
            logger.warning(self, traceback.format_exc())
1716
            return False
1717
1718
    def mark_read__all(
1719
            self,
1720
            read_datetime: datetime=None,
1721
            do_flush: bool=True,
1722
            recursive: bool=True
1723
    ) -> None:
1724
        """
1725
        Read content of all workspace visible for the user.
1726
        :param read_datetime: date of readigin
1727
        :param do_flush: flush database
1728
        :param recursive: mark read subcontent too
1729
        :return: nothing
1730
        """
1731
        return self.mark_read__workspace(None, read_datetime, do_flush, recursive) # nopep8
1732
1733
    def mark_read__workspace(self,
1734
                       workspace : Workspace,
1735
                       read_datetime: datetime=None,
1736
                       do_flush: bool=True,
1737
                       recursive: bool=True
1738
    ) -> None:
1739
        """
1740
        Read content of a workspace visible for the user.
1741
        :param read_datetime: date of readigin
1742
        :param do_flush: flush database
1743
        :param recursive: mark read subcontent too
1744
        :return: nothing
1745
        """
1746
        itemset = self.get_last_active(workspace)
1747
        for item in itemset:
1748
            if item.has_new_information_for(self._user):
1749
                self.mark_read(item, read_datetime, do_flush, recursive)
1750
1751
    def mark_read(
1752
            self,
1753
            content: Content,
1754
            read_datetime: datetime=None,
1755
            do_flush: bool=True,
1756
            recursive: bool=True
1757
    ) -> Content:
1758
        """
1759
        Read content for the user.
1760
        :param read_datetime: date of readigin
1761
        :param do_flush: flush database
1762
        :param recursive: mark read subcontent too
1763
        :return: nothing
1764
        """
1765
        assert self._user
1766
        assert content
1767
1768
        # The algorithm is:
1769
        # 1. define the read datetime
1770
        # 2. update all revisions related to current Content
1771
        # 3. do the same for all child revisions
1772
        #    (ie parent_id is content_id of current content)
1773
1774
        if not read_datetime:
1775
            read_datetime = datetime.datetime.now()
1776
1777
        viewed_revisions = self._session.query(ContentRevisionRO) \
1778
            .filter(ContentRevisionRO.content_id==content.content_id).all()
1779
1780
        for revision in viewed_revisions:
1781
            revision.read_by[self._user] = read_datetime
1782
1783
        if recursive:
1784
            # mark read :
1785
            # - all children
1786
            # - parent stuff (if you mark a comment as read,
1787
            #                 then you have seen the parent)
1788
            # - parent comments
1789
            for child in content.get_valid_children():
1790
                self.mark_read(child, read_datetime=read_datetime,
1791
                               do_flush=False)
1792
1793
            if content_type_list.Comment.slug == content.type:
1794
                self.mark_read(content.parent, read_datetime=read_datetime,
1795
                               do_flush=False, recursive=False)
1796
                for comment in content.parent.get_comments():
1797
                    if comment != content:
1798
                        self.mark_read(comment, read_datetime=read_datetime,
1799
                                       do_flush=False, recursive=False)
1800
1801
        if do_flush:
1802
            self.flush()
1803
1804
        return content
1805
1806
    def mark_unread(self, content: Content, do_flush=True) -> Content:
1807
        assert self._user
1808
        assert content
1809
1810
        revisions = self._session.query(ContentRevisionRO) \
1811
            .filter(ContentRevisionRO.content_id==content.content_id).all()
1812
1813
        for revision in revisions:
1814
            try:
1815
                del revision.read_by[self._user]
1816
            except KeyError:
1817
                pass
1818
1819
        for child in content.get_valid_children():
1820
            self.mark_unread(child, do_flush=False)
1821
1822
        if do_flush:
1823
            self.flush()
1824
1825
        return content
1826
1827
    def flush(self):
1828
        self._session.flush()
1829
1830
    def save(self, content: Content, action_description: str=None, do_flush=True, do_notify=True):
1831
        """
1832
        Save an object, flush the session and set the revision_type property
1833
        :param content:
1834
        :param action_description:
1835
        :return:
1836
        """
1837
        assert action_description is None or action_description in ActionDescription.allowed_values()
1838
1839
        if not action_description:
1840
            # See if the last action has been modified
1841
            if content.revision_type==None or len(get_history(content.revision, 'revision_type'))<=0:
1842
                # The action has not been modified, so we set it to default edition
1843
                action_description = ActionDescription.EDITION
1844
1845
        if action_description:
1846
            content.revision_type = action_description
1847
1848
        if do_flush:
1849
            # INFO - 2015-09-03 - D.A.
1850
            # There are 2 flush because of the use
1851
            # of triggers for content creation
1852
            #
1853
            # (when creating a content, actually this is an insert of a new
1854
            # revision in content_revisions ; so the mark_read operation need
1855
            # to get full real data from database before to be prepared.
1856
1857
            self._session.add(content)
1858
            self._session.flush()
1859
1860
            # TODO - 2015-09-03 - D.A. - Do not use triggers
1861
            # We should create a new ContentRevisionRO object instead of Content
1862
            # This would help managing view/not viewed status
1863
            self.mark_read(content, do_flush=True)
1864
1865
        if do_notify:
1866
            self.do_notify(content)
1867
1868
    def do_notify(self, content: Content):
1869
        """
1870
        Allow to force notification for a given content. By default, it is
1871
        called during the .save() operation
1872
        :param content:
1873
        :return:
1874
        """
1875
        NotifierFactory.create(
1876
            config=self._config,
1877
            current_user=self._user,
1878
            session=self._session,
1879
        ).notify_content_update(content)
1880
1881
    def get_keywords(self, search_string, search_string_separators=None) -> [str]:
1882
        """
1883
        :param search_string: a list of coma-separated keywords
1884
        :return: a list of str (each keyword = 1 entry
1885
        """
1886
1887
        search_string_separators = search_string_separators or ContentApi.SEARCH_SEPARATORS
1888
1889
        keywords = []
1890
        if search_string:
1891
            keywords = [keyword.strip() for keyword in re.split(search_string_separators, search_string)]
1892
1893
        return keywords
1894
1895
    def search(self, keywords: [str]) -> Query:
1896
        """
1897
        :return: a sorted list of Content items
1898
        """
1899
1900
        if len(keywords)<=0:
1901
            return None
1902
1903
        filter_group_label = list(Content.label.ilike('%{}%'.format(keyword)) for keyword in keywords)
1904
        filter_group_desc = list(Content.description.ilike('%{}%'.format(keyword)) for keyword in keywords)
1905
        title_keyworded_items = self._hard_filtered_base_query().\
1906
            filter(or_(*(filter_group_label+filter_group_desc))).\
1907
            options(joinedload('children_revisions')).\
1908
            options(joinedload('parent'))
1909
1910
        return title_keyworded_items
1911
1912
    def get_all_types(self) -> typing.List[ContentType]:
1913
        labels = content_type_list.endpoint_allowed_types_slug()
1914
        content_types = []
1915
        for label in labels:
1916
            content_types.append(content_type_list.get_one_by_slug(label))
1917
1918
        return content_types
1919
1920
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1921
    def exclude_unavailable(
1922
        self,
1923
        contents: typing.List[Content],
1924
    ) -> typing.List[Content]:
1925
        """
1926
        Update and return list with content under archived/deleted removed.
1927
        :param contents: List of contents to parse
1928
        """
1929
        for content in contents[:]:
1930
            if self.content_under_deleted(content) or self.content_under_archived(content):
1931
                contents.remove(content)
1932
        return contents
1933
1934
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1935
    def content_under_deleted(self, content: Content) -> bool:
1936
        if content.parent:
1937
            if content.parent.is_deleted:
1938
                return True
1939
            if content.parent.parent:
1940
                return self.content_under_deleted(content.parent)
1941
        return False
1942
1943
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1944
    def content_under_archived(self, content: Content) -> bool:
1945
        if content.parent:
1946
            if content.parent.is_archived:
1947
                return True
1948
            if content.parent.parent:
1949
                return self.content_under_archived(content.parent)
1950
        return False
1951
1952
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1953
    def find_one_by_unique_property(
1954
            self,
1955
            property_name: str,
1956
            property_value: str,
1957
            workspace: Workspace=None,
1958
    ) -> Content:
1959
        """
1960
        Return Content who contains given property.
1961
        Raise sqlalchemy.orm.exc.MultipleResultsFound if more than one Content
1962
        contains this property value.
1963
        :param property_name: Name of property
1964
        :param property_value: Value of property
1965
        :param workspace: Workspace who contains Content
1966
        :return: Found Content
1967
        """
1968
        # TODO - 20160602 - Bastien: Should be JSON type query
1969
        # see https://www.compose.io/articles/using-json-extensions-in-\
1970
        # postgresql-from-python-2/
1971
        query = self._base_query(workspace=workspace).filter(
1972
            Content._properties.like(
1973
                '%"{property_name}": "{property_value}"%'.format(
1974
                    property_name=property_name,
1975
                    property_value=property_value,
1976
                )
1977
            )
1978
        )
1979
        return query.one()
1980
1981
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1982
    def generate_folder_label(
1983
            self,
1984
            workspace: Workspace,
1985
            parent: Content=None,
1986
    ) -> str:
1987
        """
1988
        Generate a folder label
1989
        :param workspace: Future folder workspace
1990
        :param parent: Parent of foture folder (can be None)
1991
        :return: Generated folder name
1992
        """
1993
        _ = self.translator.get_translation
1994
        query = self._base_query(workspace=workspace)\
1995
            .filter(Content.label.ilike('{0}%'.format(
1996
                _('New folder'),
1997
            )))
1998
        if parent:
1999
            query = query.filter(Content.parent == parent)
2000
2001
        return _('New folder {0}').format(
2002
            query.count() + 1,
2003
        )
2004