Passed
Pull Request — develop (#124)
by inkhey
01:53
created

backend.tracim_backend.lib.core.content   F

Complexity

Total Complexity 247

Size/Duplication

Total Lines 1948
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 247
eloc 1081
dl 0
loc 1948
rs 1.276
c 0
b 0
f 0

70 Methods

Rating   Name   Duplication   Size   Complexity  
A ContentApi.get_content_in_context() 0 2 1
A ContentApi._base_query() 0 18 5
A ContentApi._get_revision_join() 0 12 1
A ContentApi.__real_base_query() 0 30 5
A ContentApi._is_filename_available_or_raise() 0 31 3
A ContentApi.get_revision_in_context() 0 3 1
A ContentApi.__revisions_real_base_query() 0 21 4
A ContentApi._hard_filtered_base_query() 0 34 4
A ContentApi.__init__() 0 31 4
A ContentApi._revisions_base_query() 0 16 4
A ContentApi.sort_tree_items() 0 14 2
A ContentApi.get_canonical_query() 0 7 1
A ContentApi.show() 0 27 1
A ContentApi.sort_content() 0 7 1
A ContentApi._prepare_filename() 0 15 1
A ContentApi.get_base_query() 0 5 1
B ContentApi._is_filename_available() 0 60 6
F ContentApi.create() 0 79 18
A ContentApi.get_one_from_revision() 0 19 2
A ContentApi.create_comment() 0 24 4
A ContentApi.search() 0 16 2
A ContentApi.get_jpg_preview_allowed_dim() 0 7 1
A ContentApi.get_last_unread() 0 14 1
C ContentApi.get_jpg_preview_path() 0 60 10
D ContentApi._get_all_query() 0 78 13
A ContentApi.set_status() 0 6 2
A ContentApi.update_file_data() 0 28 2
A ContentApi.mark_read__workspace() 0 17 3
A ContentApi.get_one_revision() 0 19 3
B ContentApi.get_one() 0 36 6
A ContentApi.set_allowed_content() 0 14 3
A ContentApi.get_keywords() 0 13 2
A ContentApi.content_under_archived() 0 7 4
A ContentApi.has_pdf_preview() 0 10 2
B ContentApi.copy() 0 48 8
A ContentApi.unarchive() 0 4 1
A ContentApi.filter_query_for_content_label_as_path() 0 49 2
A ContentApi.mark_unread() 0 20 5
A ContentApi.get_one_by_label_and_parent_labels() 0 52 3
A ContentApi.get_all_without_exception() 0 9 2
A ContentApi.copy_children() 0 3 2
A ContentApi.has_jpeg_preview() 0 10 2
A ContentApi.delete() 0 20 1
A ContentApi._set_allowed_content() 0 15 1
A ContentApi.flush() 0 2 1
A ContentApi.move_recursively() 0 13 3
A ContentApi.exclude_unavailable() 0 12 4
A ContentApi.mark_read__all() 0 14 1
A ContentApi.archive() 0 20 1
A ContentApi.get_folder_with_workspace_path_labels() 0 42 3
C ContentApi.mark_read() 0 54 9
A ContentApi.get_all_types() 0 7 2
A ContentApi.generate_folder_label() 0 21 2
A ContentApi.get_one_by_label_and_parent() 0 42 3
A ContentApi.get_preview_page_nb() 0 10 2
A ContentApi.is_editable() 0 4 1
A ContentApi.get_all_with_filter() 0 17 2
A ContentApi.get_one_revision_filepath() 0 12 1
B ContentApi.save() 0 37 7
A ContentApi.get_full_pdf_preview_path() 0 17 3
A ContentApi.find_one_by_unique_property() 0 27 1
F ContentApi.get_last_active() 0 66 16
A ContentApi.get_all() 0 23 1
A ContentApi.do_notify() 0 12 1
A ContentApi.get_pdf_preview_path() 0 38 4
A ContentApi.content_under_deleted() 0 7 4
A ContentApi.restore_content_default_allowed_content() 0 10 3
A ContentApi.undelete() 0 4 1
B ContentApi.update_content() 0 24 6
B ContentApi.move() 0 29 8

2 Functions

Rating   Name   Duplication   Size   Complexity  
A compare_tree_items_for_sorting_by_type_and_name() 0 5 1
B compare_content_for_sorting_by_type_and_name() 0 37 6

How to fix   Complexity   

Complexity

Complex classes like backend.tracim_backend.lib.core.content often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
import datetime
3
import os
4
import re
5
import typing
6
from contextlib import contextmanager
7
8
import sqlalchemy
9
import transaction
10
from depot.io.utils import FileIntent
11
from depot.manager import DepotManager
12
from preview_generator.exception import UnavailablePreviewType
13
from preview_generator.exception import UnsupportedMimeType
14
from preview_generator.manager import PreviewManager
15
from sqlalchemy import desc
16
from sqlalchemy import func
17
from sqlalchemy import or_
18
from sqlalchemy.orm import Query
19
from sqlalchemy.orm import aliased
20
from sqlalchemy.orm import joinedload
21
from sqlalchemy.orm.attributes import QueryableAttribute
22
from sqlalchemy.orm.attributes import get_history
23
from sqlalchemy.orm.exc import NoResultFound
24
from sqlalchemy.orm.session import Session
25
from sqlalchemy.sql.elements import and_
26
27
from tracim_backend.app_models.contents import content_status_list
28
from tracim_backend.app_models.contents import content_type_list
29
from tracim_backend.app_models.contents import FOLDER_TYPE
30
from tracim_backend.app_models.contents import ContentStatus
31
from tracim_backend.app_models.contents import ContentType
32
from tracim_backend.app_models.contents import GlobalStatus
33
from tracim_backend.exceptions import ContentInNotEditableState
34
from tracim_backend.exceptions import ContentFilenameAlreadyUsedInFolder
35
from tracim_backend.exceptions import ContentNotFound
36
from tracim_backend.exceptions import ContentTypeNotExist
37
from tracim_backend.exceptions import EmptyCommentContentNotAllowed
38
from tracim_backend.exceptions import EmptyLabelNotAllowed
39
from tracim_backend.exceptions import PageOfPreviewNotFound
40
from tracim_backend.exceptions import PreviewDimNotAllowed
41
from tracim_backend.exceptions import RevisionDoesNotMatchThisContent
42
from tracim_backend.exceptions import SameValueError
43
from tracim_backend.exceptions import TracimUnavailablePreviewType
44
from tracim_backend.exceptions import UnallowedSubContent
45
from tracim_backend.exceptions import UnavailablePreview
46
from tracim_backend.exceptions import WorkspacesDoNotMatch
47
from tracim_backend.lib.core.notifications import NotifierFactory
48
from tracim_backend.lib.utils.logger import logger
49
from tracim_backend.lib.utils.translation import DEFAULT_FALLBACK_LANG
50
from tracim_backend.lib.utils.translation import Translator
51
from tracim_backend.lib.utils.utils import cmp_to_key
52
from tracim_backend.lib.utils.utils import current_date_for_filename
53
from tracim_backend.lib.utils.utils import preview_manager_page_format
54
from tracim_backend.models.auth import User
55
from tracim_backend.models.context_models import ContentInContext
56
from tracim_backend.models.context_models import PreviewAllowedDim
57
from tracim_backend.models.context_models import RevisionInContext
58
from tracim_backend.models.data import ActionDescription
59
from tracim_backend.models.data import Content
60
from tracim_backend.models.data import ContentRevisionRO
61
from tracim_backend.models.data import NodeTreeItem
62
from tracim_backend.models.data import RevisionReadStatus
63
from tracim_backend.models.data import UserRoleInWorkspace
64
from tracim_backend.models.data import Workspace
65
from tracim_backend.models.revision_protection import new_revision
66
67
__author__ = 'damien'
68
69
70
# TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
71
def compare_content_for_sorting_by_type_and_name(
72
        content1: Content,
73
        content2: Content
74
) -> int:
75
    """
76
    :param content1:
77
    :param content2:
78
    :return:    1 if content1 > content2
79
                -1 if content1 < content2
80
                0 if content1 = content2
81
    """
82
83
    if content1.type == content2.type:
84
        if content1.get_label().lower()>content2.get_label().lower():
85
            return 1
86
        elif content1.get_label().lower()<content2.get_label().lower():
87
            return -1
88
        return 0
89
    else:
90
        # TODO - D.A. - 2014-12-02 - Manage Content Types Dynamically
91
        content_type_order = [
92
            content_type_list.Folder.slug,
93
            content_type_list.Page.slug,
94
            content_type_list.Thread.slug,
95
            content_type_list.File.slug,
96
        ]
97
98
        content_1_type_index = content_type_order.index(content1.type)
99
        content_2_type_index = content_type_order.index(content2.type)
100
        result = content_1_type_index - content_2_type_index
101
102
        if result < 0:
103
            return -1
104
        elif result > 0:
105
            return 1
106
        else:
107
            return 0
108
109
# TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
110
def compare_tree_items_for_sorting_by_type_and_name(
111
        item1: NodeTreeItem,
112
        item2: NodeTreeItem
113
) -> int:
114
    return compare_content_for_sorting_by_type_and_name(item1.node, item2.node)
115
116
117
class ContentApi(object):
118
119
    SEARCH_SEPARATORS = ',| '
120
    SEARCH_DEFAULT_RESULT_NB = 50
121
122
    # DISPLAYABLE_CONTENTS = (
123
    #     content_type_list.Folder.slug,
124
    #     content_type_list.File.slug,
125
    #     content_type_list.Comment.slug,
126
    #     content_type_list.Thread.slug,
127
    #     content_type_list.Page.slug,
128
    #     content_type_list.Page.slugLegacy,
129
    #     ContentType.MarkdownPage,
130
    # )
131
132
    def __init__(
133
            self,
134
            session: Session,
135
            current_user: typing.Optional[User],
136
            config,
137
            show_archived: bool = False,
138
            show_deleted: bool = False,
139
            show_temporary: bool = False,
140
            show_active: bool = True,
141
            all_content_in_treeview: bool = True,
142
            force_show_all_types: bool = False,
143
            disable_user_workspaces_filter: bool = False,
144
    ) -> None:
145
        self._session = session
146
        self._user = current_user
147
        self._config = config
148
        self._user_id = current_user.user_id if current_user else None
149
        self._show_archived = show_archived
150
        self._show_deleted = show_deleted
151
        self._show_temporary = show_temporary
152
        self._show_active = show_active
153
        self._show_all_type_of_contents_in_treeview = all_content_in_treeview
154
        self._force_show_all_types = force_show_all_types
155
        self._disable_user_workspaces_filter = disable_user_workspaces_filter
156
        self.preview_manager = PreviewManager(self._config.PREVIEW_CACHE_DIR, create_folder=True)  # nopep8
157
        default_lang = None
158
        if self._user:
159
            default_lang = self._user.lang
160
        if not default_lang:
161
            default_lang = DEFAULT_FALLBACK_LANG
162
        self.translator = Translator(app_config=self._config, default_lang=default_lang)  # nopep8
163
164
    @contextmanager
165
    def show(
166
            self,
167
            show_archived: bool=False,
168
            show_deleted: bool=False,
169
            show_temporary: bool=False,
170
    ) -> typing.Generator['ContentApi', None, None]:
171
        """
172
        Use this method as context manager to update show_archived,
173
        show_deleted and show_temporary properties during context.
174
        :param show_archived: show archived contents
175
        :param show_deleted:  show deleted contents
176
        :param show_temporary:  show temporary contents
177
        """
178
        previous_show_archived = self._show_archived
179
        previous_show_deleted = self._show_deleted
180
        previous_show_temporary = self._show_temporary
181
182
        try:
183
            self._show_archived = show_archived
184
            self._show_deleted = show_deleted
185
            self._show_temporary = show_temporary
186
            yield self
187
        finally:
188
            self._show_archived = previous_show_archived
189
            self._show_deleted = previous_show_deleted
190
            self._show_temporary = previous_show_temporary
191
192
    def get_content_in_context(self, content: Content) -> ContentInContext:
193
        return ContentInContext(content, self._session, self._config, self._user)  # nopep8
194
195
    def get_revision_in_context(self, revision: ContentRevisionRO) -> RevisionInContext:  # nopep8
196
        # TODO - G.M - 2018-06-173 - create revision in context object
197
        return RevisionInContext(revision, self._session, self._config, self._user) # nopep8
198
    
199
    def _get_revision_join(self) -> sqlalchemy.sql.elements.BooleanClauseList:
200
        """
201
        Return the Content/ContentRevision query join condition
202
        :return: Content/ContentRevision query join condition
203
        """
204
        return and_(Content.id == ContentRevisionRO.content_id,
205
                    ContentRevisionRO.revision_id == self._session.query(
206
                        ContentRevisionRO.revision_id)
207
                    .filter(ContentRevisionRO.content_id == Content.id)
208
                    .order_by(ContentRevisionRO.revision_id.desc())
209
                    .limit(1)
210
                    .correlate(Content))
211
212
    def get_canonical_query(self) -> Query:
213
        """
214
        Return the Content/ContentRevision base query who join these table on the last revision.
215
        :return: Content/ContentRevision Query
216
        """
217
        return self._session.query(Content)\
218
            .join(ContentRevisionRO, self._get_revision_join())
219
220
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
221
    @classmethod
222
    def sort_tree_items(
223
        cls,
224
        content_list: typing.List[NodeTreeItem],
225
    )-> typing.List[NodeTreeItem]:
226
        news = []
227
        for item in content_list:
228
            news.append(item)
229
230
        content_list.sort(key=cmp_to_key(
231
            compare_tree_items_for_sorting_by_type_and_name,
232
        ))
233
234
        return content_list
235
236
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
237
    @classmethod
238
    def sort_content(
239
        cls,
240
        content_list: typing.List[Content],
241
    ) -> typing.List[Content]:
242
        content_list.sort(key=cmp_to_key(compare_content_for_sorting_by_type_and_name))
243
        return content_list
244
245
    def __real_base_query(
246
        self,
247
        workspace: Workspace = None,
248
    ) -> Query:
249
        result = self.get_canonical_query()
250
251
        # Exclude non displayable types
252
        if not self._force_show_all_types:
253
            result = result.filter(Content.type.in_(content_type_list.query_allowed_types_slugs()))
254
255
        if workspace:
256
            result = result.filter(Content.workspace_id == workspace.workspace_id)
257
258
        # Security layer: if user provided, filter
259
        # with user workspaces privileges
260
        if self._user and not self._disable_user_workspaces_filter:
261
            user = self._session.query(User).get(self._user_id)
262
            # Filter according to user workspaces
263
            workspace_ids = [r.workspace_id for r in user.roles \
264
                             if r.role>=UserRoleInWorkspace.READER]
265
            result = result.filter(or_(
266
                Content.workspace_id.in_(workspace_ids),
267
                # And allow access to non workspace document when he is owner
268
                and_(
269
                    Content.workspace_id == None,
270
                    Content.owner_id == self._user_id,
271
                )
272
            ))
273
274
        return result
275
276
    def _base_query(self, workspace: Workspace=None) -> Query:
277
        result = self.__real_base_query(workspace)
278
279
        if not self._show_active:
280
            result = result.filter(or_(
281
                Content.is_deleted==True,
282
                Content.is_archived==True,
283
            ))
284
        if not self._show_deleted:
285
            result = result.filter(Content.is_deleted==False)
286
287
        if not self._show_archived:
288
            result = result.filter(Content.is_archived==False)
289
290
        if not self._show_temporary:
291
            result = result.filter(Content.is_temporary==False)
292
293
        return result
294
295
    def __revisions_real_base_query(
296
        self,
297
        workspace: Workspace=None,
298
    ) -> Query:
299
        result = self._session.query(ContentRevisionRO)
300
301
        # Exclude non displayable types
302
        if not self._force_show_all_types:
303
            result = result.filter(Content.type.in_(self.DISPLAYABLE_CONTENTS))
304
305
        if workspace:
306
            result = result.filter(ContentRevisionRO.workspace_id==workspace.workspace_id)
307
308
        if self._user:
309
            user = self._session.query(User).get(self._user_id)
310
            # Filter according to user workspaces
311
            workspace_ids = [r.workspace_id for r in user.roles \
312
                             if r.role>=UserRoleInWorkspace.READER]
313
            result = result.filter(ContentRevisionRO.workspace_id.in_(workspace_ids))
314
315
        return result
316
317
    def _revisions_base_query(
318
        self,
319
        workspace: Workspace=None,
320
    ) -> Query:
321
        result = self.__revisions_real_base_query(workspace)
322
323
        if not self._show_deleted:
324
            result = result.filter(ContentRevisionRO.is_deleted==False)
325
326
        if not self._show_archived:
327
            result = result.filter(ContentRevisionRO.is_archived==False)
328
329
        if not self._show_temporary:
330
            result = result.filter(Content.is_temporary==False)
331
332
        return result
333
334
    def _hard_filtered_base_query(
335
        self,
336
        workspace: Workspace=None,
337
    ) -> Query:
338
        """
339
        If set to True, then filterign on is_deleted and is_archived will also
340
        filter parent properties. This is required for search() function which
341
        also search in comments (for example) which may be 'not deleted' while
342
        the associated content is deleted
343
344
        :param hard_filtering:
345
        :return:
346
        """
347
        result = self.__real_base_query(workspace)
348
349
        if not self._show_deleted:
350
            parent = aliased(Content)
351
            result = result.join(parent, Content.parent).\
352
                filter(Content.is_deleted==False).\
353
                filter(parent.is_deleted==False)
354
355
        if not self._show_archived:
356
            parent = aliased(Content)
357
            result = result.join(parent, Content.parent).\
358
                filter(Content.is_archived==False).\
359
                filter(parent.is_archived==False)
360
361
        if not self._show_temporary:
362
            parent = aliased(Content)
363
            result = result.join(parent, Content.parent). \
364
                filter(Content.is_temporary == False). \
365
                filter(parent.is_temporary == False)
366
367
        return result
368
369
    def get_base_query(
370
        self,
371
        workspace: Workspace,
372
    ) -> Query:
373
        return self._base_query(workspace)
374
375
    # TODO - G.M - 2018-07-17 - [Cleanup] Drop this method if unneeded
376
    # def get_child_folders(self, parent: Content=None, workspace: Workspace=None, filter_by_allowed_content_types: list=[], removed_item_ids: list=[], allowed_node_types=None) -> typing.List[Content]:
377
    #     """
378
    #     This method returns child items (folders or items) for left bar treeview.
379
    # 
380
    #     :param parent:
381
    #     :param workspace:
382
    #     :param filter_by_allowed_content_types:
383
    #     :param removed_item_ids:
384
    #     :param allowed_node_types: This parameter allow to hide folders for which the given type of content is not allowed.
385
    #            For example, if you want to move a Page from a folder to another, you should show only folders that accept pages
386
    #     :return:
387
    #     """
388
    #     filter_by_allowed_content_types = filter_by_allowed_content_types or []  # FDV
389
    #     removed_item_ids = removed_item_ids or []  # FDV
390
    # 
391
    #     if not allowed_node_types:
392
    #         allowed_node_types = [content_type_list.Folder.slug]
393
    #     elif allowed_node_types==content_type_list.Any_SLUG:
394
    #         allowed_node_types = ContentType.all()
395
    # 
396
    #     parent_id = parent.content_id if parent else None
397
    #     folders = self._base_query(workspace).\
398
    #         filter(Content.parent_id==parent_id).\
399
    #         filter(Content.type.in_(allowed_node_types)).\
400
    #         filter(Content.content_id.notin_(removed_item_ids)).\
401
    #         all()
402
    # 
403
    #     if not filter_by_allowed_content_types or \
404
    #                     len(filter_by_allowed_content_types)<=0:
405
    #         # Standard case for the left treeview: we want to show all contents
406
    #         # in the left treeview... so we still filter because for example
407
    #         # comments must not appear in the treeview
408
    #         return [folder for folder in folders \
409
    #                 if folder.type in ContentType.allowed_types_for_folding()]
410
    # 
411
    #     # Now this is a case of Folders only (used for moving content)
412
    #     # When moving a content, you must get only folders that allow to be filled
413
    #     # with the type of content you want to move
414
    #     result = []
415
    #     for folder in folders:
416
    #         for allowed_content_type in filter_by_allowed_content_types:
417
    # 
418
    #             is_folder = folder.type == content_type_list.Folder.slug
419
    #             content_type__allowed = folder.properties['allowed_content'][allowed_content_type] == True
420
    # 
421
    #             if is_folder and content_type__allowed:
422
    #                 result.append(folder)
423
    #                 break
424
    # 
425
    #     return result
426
427
    def _is_filename_available(
428
            self,
429
            filename: str,
430
            workspace: Workspace,
431
            parent: Content = None,
432
            exclude_content_id: int = None,
433
    ) -> bool:
434
        """
435
        Check if content label is free
436
        :param filename: content label
437
        :param workspace: workspace of the content
438
        :param parent: parent of the content
439
        :param exclude_content_id: exclude a specific content_id (useful
440
        to verify  other content for content update)
441
        :return: True if content label is available
442
        """
443
        # INFO - G.M - 2018-09-04 - Method should not be used by special content
444
        # with empty filename like comment.
445
        assert filename
446
        assert workspace
447
        label, file_extension = os.path.splitext(filename)
448
        query = self.get_base_query(workspace)
449
450
        if parent:
451
            query = query.filter(Content.parent_id == parent.content_id)
452
        else:
453
            query = query.filter(Content.parent_id == None)
454
455
        if exclude_content_id:
456
            query = query.filter(Content.content_id != exclude_content_id)
457
        query = query.filter(Content.workspace_id == workspace.workspace_id)
458
459
460
        nb_content_with_the_filename = query.filter(
461
            Content.file_name == filename
462
        ).count()
463
        if nb_content_with_the_filename == 0:
464
            return True
465
        elif nb_content_with_the_filename == 1:
466
            return False
467
        else:
468
            critical_error_text = 'Something is wrong in the database ! '\
469
                                  'Content filename should be unique ' \
470
                                  'in a same folder in database' \
471
                                  'but you have {nb} content with ' \
472
                                  'filename {filename} ' \
473
                                  'in workspace {workspace_id}'
474
475
            critical_error_text = critical_error_text.format(
476
                nb=nb_content_with_the_filename,
477
                filename=filename,
478
                workspace_id=workspace.workspace_id,
479
            )
480
            if parent:
481
                critical_error_text = '{text} and parent as content {parent_id}'.format(  # nopep8
482
                    text=critical_error_text,
483
                    parent_id=parent.parent_id
484
                )
485
            logger.critical(self, critical_error_text)
486
            return False
487
488
    def _prepare_filename(
489
            self,
490
            label: str,
491
            file_extension: str,
492
    ) -> str:
493
        """
494
        generate correct file_name from label and file_extension
495
        :return: valid
496
        """
497
        # TODO - G.M - 2018-10-11 - Find a way to
498
        # Refactor this in order to use same method
499
        # in both contentLib and .file_name of content
500
        return '{label}{file_extension}'.format(
501
            label=label,
502
            file_extension=file_extension
503
        )
504
505
    def _is_filename_available_or_raise(
506
        self,
507
        filename: str,
508
        workspace: Workspace,
509
        parent: Content = None,
510
        exclude_content_id: int = None,
511
    ) -> bool:
512
        """
513
        Same as _is_filename_available but raise exception instead of
514
        returning boolean if content filename is already used
515
        """
516
        if self._is_filename_available(
517
            filename,
518
            workspace,
519
            parent,
520
            exclude_content_id
521
        ):
522
            return True
523
        # INFO - G.M - 2018-10-11 - prepare exception message
524
        exception_message = 'A Content already exist with the same filename ' \
525
            '{filename}  in workspace {workspace_id}'
526
        exception_message = exception_message.format(
527
            filename=filename,
528
            workspace_id=workspace.workspace_id,
529
        )
530
        if parent:
531
            exception_message = '{text} and parent as content {parent_id}'.format(
532
                text=exception_message,
533
                parent_id=parent.parent_id
534
            )
535
        raise ContentFilenameAlreadyUsedInFolder(exception_message)
536
537
    def create(self, content_type_slug: str, workspace: Workspace, parent: Content=None, label: str = '', filename: str = '', do_save=False, is_temporary: bool=False, do_notify=True) -> Content:
538
        # TODO - G.M - 2018-07-16 - raise Exception instead of assert
539
        assert content_type_slug != content_type_list.Any_SLUG
540
        assert not (label and filename)
541
542
        if content_type_slug == FOLDER_TYPE and not label:
543
            label = self.generate_folder_label(workspace, parent)
544
545
        # TODO BS 2018-08-13: Despite that workspace is required, create_comment
546
        # can call here with None. Must update create_comment tu require the
547
        # workspace.
548
        if not workspace:
549
            workspace = parent.workspace
550
551
        content_type = content_type_list.get_one_by_slug(content_type_slug)
552
        if parent and parent.properties and 'allowed_content' in parent.properties:
553
            if content_type.slug not in parent.properties['allowed_content'] or not parent.properties['allowed_content'][content_type.slug]:
554
                raise UnallowedSubContent(' SubContent of type {subcontent_type}  not allowed in content {content_id}'.format(  # nopep8
555
                    subcontent_type=content_type.slug,
556
                    content_id=parent.content_id,
557
                ))
558
        if not workspace and parent:
559
            workspace = parent.workspace
560
561
        if workspace:
562
            if content_type.slug not in workspace.get_allowed_content_types():
563
                raise UnallowedSubContent(
564
                    ' SubContent of type {subcontent_type}  not allowed in workspace {content_id}'.format(  # nopep8
565
                        subcontent_type=content_type.slug,
566
                        content_id=workspace.workspace_id,
567
                    )
568
                )
569
        content = Content()
570
        if label:
571
            file_extension = ''
572
            if content_type.file_extension:
573
                file_extension = content_type.file_extension
574
            filename = self._prepare_filename(label, file_extension)
575
            self._is_filename_available_or_raise(
576
                filename,
577
                workspace,
578
                parent,
579
            )
580
            # TODO - G.M - 2018-10-15 - Set file extension and label
581
            # explicitly instead of filename in order to have correct
582
            # label/file-extension separation.
583
            content.label = label
584
            content.file_extension = file_extension
585
        elif filename:
586
            self._is_filename_available_or_raise(
587
                filename,
588
                workspace,
589
                parent
590
            )
591
            # INFO - G.M - 2018-07-04 - File_name setting automatically
592
            # set label and file_extension
593
            content.file_name = filename
594
        else:
595
            if content_type_slug == content_type_list.Comment.slug:
596
                # INFO - G.M - 2018-07-16 - Default label for comments is
597
                # empty string.
598
                content.label = ''
599
            else:
600
                raise EmptyLabelNotAllowed(
601
                    'Content of type {} should have a valid label'.format(content_type_slug)  # nopep8
602
                )
603
604
        content.owner = self._user
605
        content.parent = parent
606
607
        content.workspace = workspace
608
        content.type = content_type.slug
609
        content.is_temporary = is_temporary
610
        content.revision_type = ActionDescription.CREATION
611
612
        if do_save:
613
            self._session.add(content)
614
            self.save(content, ActionDescription.CREATION, do_notify=do_notify)
615
        return content
616
617
    def create_comment(self, workspace: Workspace=None, parent: Content=None, content:str ='', do_save=False, do_notify=True) -> Content:
618
        # TODO: check parent allowed_type and workspace allowed_ type
619
        assert parent and parent.type != FOLDER_TYPE
620
        if not self.is_editable(parent):
621
            raise ContentInNotEditableState(
622
                "Can't create comment on content, you need to change his status or state (deleted/archived) before any change."
623
            )
624
        if not content:
625
            raise EmptyCommentContentNotAllowed()
626
627
        item = self.create(
628
            content_type_slug=content_type_list.Comment.slug,
629
            workspace=workspace,
630
            parent=parent,
631
            do_notify=False,
632
            do_save=False,
633
            label='',
634
        )
635
        item.description = content
636
        item.revision_type = ActionDescription.COMMENT
637
638
        if do_save:
639
            self.save(item, ActionDescription.COMMENT, do_notify=do_notify)
640
        return item
641
642
    def get_one_from_revision(self, content_id: int, content_type: str, workspace: Workspace=None, revision_id=None) -> Content:
643
        """
644
        This method is a hack to convert a node revision item into a node
645
        :param content_id:
646
        :param content_type:
647
        :param workspace:
648
        :param revision_id:
649
        :return:
650
        """
651
652
        content = self.get_one(content_id, content_type, workspace)
653
        revision = self._session.query(ContentRevisionRO).filter(ContentRevisionRO.revision_id==revision_id).one()
654
655
        if revision.content_id==content.content_id:
656
            content.revision_to_serialize = revision.revision_id
657
        else:
658
            raise ValueError('Revision not found for given content')
659
660
        return content
661
662
    def get_one(
663
            self,
664
            content_id: int,
665
            content_type: str,
666
            workspace: Workspace=None,
667
            parent: Content=None,
668
            ignore_content_state_filter: bool=False
669
    ) -> typing.Optional[Content]:
670
671
        if not content_id:
672
            return None
673
674
        if ignore_content_state_filter:
675
            base_request = self.__real_base_query(workspace)
676
        else:
677
            base_request = self._base_query(workspace)
678
679
680
        base_request = base_request.filter(Content.content_id==content_id)
681
682
        if content_type!=content_type_list.Any_SLUG:
683
            base_request = base_request.filter(Content.type==content_type)
684
685
        if parent:
686
            base_request = base_request.filter(Content.parent_id==parent.content_id)  # nopep8
687
688
        try:
689
            content = base_request.one()
690
        except NoResultFound as exc:
691
            # TODO - G.M - 2018-07-16 - Add better support for all different
692
            # error case who can happened here
693
            # like content doesn't exist, wrong parent, wrong content_type, wrong workspace,
694
            # wrong access to this workspace, wrong base filter according
695
            # to content_status.
696
            raise ContentNotFound('Content "{}" not found in database'.format(content_id)) from exc  # nopep8
697
        return content
698
699
    def get_one_revision(self, revision_id: int = None, content: Content= None) -> ContentRevisionRO:  # nopep8
700
        """
701
        This method allow us to get directly any revision with its id
702
        :param revision_id: The content's revision's id that we want to return
703
        :param content: The content related to the revision, if None do not
704
        check if revision is related to this content.
705
        :return: An item Content linked with the correct revision
706
        """
707
        assert revision_id is not None# DYN_REMOVE
708
709
        revision = self._session.query(ContentRevisionRO).filter(ContentRevisionRO.revision_id == revision_id).one()
710
        if content and revision.content_id != content.content_id:
711
            raise RevisionDoesNotMatchThisContent(
712
                'revision {revision_id} is not a revision of content {content_id}'.format(  # nopep8
713
                    revision_id=revision.revision_id,
714
                    content_id=content.content_id,
715
                    )
716
            )
717
        return revision
718
719
    # INFO - A.P - 2017-07-03 - python file object getter
720
    # in case of we cook a version of preview manager that allows a pythonic
721
    # access to files
722
    # def get_one_revision_file(self, revision_id: int = None):
723
    #     """
724
    #     This function allows us to directly get a Python file object from its
725
    #     revision identifier.
726
    #     :param revision_id: The revision id of the file we want to return
727
    #     :return: The corresponding Python file object
728
    #     """
729
    #     revision = self.get_one_revision(revision_id)
730
    #     return DepotManager.get().get(revision.depot_file)
731
732
    def get_one_revision_filepath(self, revision_id: int = None) -> str:
733
        """
734
        This method allows us to directly get a file path from its revision
735
        identifier.
736
        :param revision_id: The revision id of the filepath we want to return
737
        :return: The corresponding filepath
738
        """
739
        revision = self.get_one_revision(revision_id)
740
        depot = DepotManager.get()
741
        depot_stored_file = depot.get(revision.depot_file)  # type: StoredFile
742
        depot_file_path = depot_stored_file._file_path  # type: str
743
        return depot_file_path
744
745
    # TODO - G.M - 2018-09-04 - [Cleanup] Is this method already needed ?
746
    def get_one_by_label_and_parent(
747
            self,
748
            content_label: str,
749
            content_parent: Content=None,
750
    ) -> Content:
751
        """
752
        This method let us request the database to obtain a Content with its name and parent
753
        :param content_label: Either the content's label or the content's filename if the label is None
754
        :param content_parent: The parent's content
755
        :param workspace: The workspace's content
756
        :return The corresponding Content
757
        """
758
        workspace = content_parent.workspace if content_parent else None
759
        query = self._base_query(workspace)
760
        parent_id = content_parent.content_id if content_parent else None
761
        query = query.filter(Content.parent_id == parent_id)
762
763
        file_name, file_extension = os.path.splitext(content_label)
764
765
        # TODO - G.M - 2018-09-04 - If this method is needed, it should be
766
        # rewritten in order to avoid content_type hardcoded code there
767
        return query.filter(
768
            or_(
769
                and_(
770
                    Content.type == content_type_list.File.slug,
771
                    Content.label == file_name,
772
                    Content.file_extension == file_extension,
773
                ),
774
                and_(
775
                    Content.type == content_type_list.Thread.slug,
776
                    Content.label == file_name,
777
                ),
778
                and_(
779
                    Content.type == content_type_list.Page.slug,
780
                    Content.label == file_name,
781
                ),
782
                and_(
783
                    Content.type == content_type_list.Folder.slug,
784
                    Content.label == content_label,
785
                ),
786
            )
787
        ).one()
788
789
    # TODO - G.M - 2018-09-04 - [Cleanup] Is this method already needed ?
790
    def get_one_by_label_and_parent_labels(
791
            self,
792
            content_label: str,
793
            workspace: Workspace,
794
            content_parent_labels: [str]=None,
795
    ):
796
        """
797
        Return content with it's label, workspace and parents labels (optional)
798
        :param content_label: label of content (label or file_name)
799
        :param workspace: workspace containing all of this
800
        :param content_parent_labels: Ordered list of labels representing path
801
            of folder (without workspace label).
802
        E.g.: ['foo', 'bar'] for complete path /Workspace1/foo/bar folder
803
        :return: Found Content
804
        """
805
        query = self._base_query(workspace)
806
        parent_folder = None
807
808
        # Grab content parent folder if parent path given
809
        if content_parent_labels:
810
            parent_folder = self.get_folder_with_workspace_path_labels(
811
                content_parent_labels,
812
                workspace,
813
            )
814
815
        # Build query for found content by label
816
        content_query = self.filter_query_for_content_label_as_path(
817
            query=query,
818
            content_label_as_file=content_label,
819
        )
820
821
        # Modify query to apply parent folder filter if any
822
        if parent_folder:
823
            content_query = content_query.filter(
824
                Content.parent_id == parent_folder.content_id,
825
            )
826
        else:
827
            content_query = content_query.filter(
828
                Content.parent_id == None,
829
            )
830
831
        # Filter with workspace
832
        content_query = content_query.filter(
833
            Content.workspace_id == workspace.workspace_id,
834
        )
835
836
        # Return the content
837
        return content_query\
838
            .order_by(
839
                Content.revision_id.desc(),
840
            )\
841
            .one()
842
843
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
844
    def get_folder_with_workspace_path_labels(
845
            self,
846
            path_labels: [str],
847
            workspace: Workspace,
848
    ) -> Content:
849
        """
850
        Return a Content folder for given relative path.
851
        TODO BS 20161124: Not safe if web interface allow folder duplicate names
852
        :param path_labels: List of labels representing path of folder
853
        (without workspace label).
854
        E.g.: ['foo', 'bar'] for complete path /Workspace1/foo/bar folder
855
        :param workspace: workspace of folders
856
        :return: Content folder
857
        """
858
        query = self._base_query(workspace)
859
        folder = None
860
861
        for label in path_labels:
862
            # Filter query on label
863
            folder_query = query \
864
                .filter(
865
                Content.type == content_type_list.Folder.slug,
866
                Content.label == label,
867
                Content.workspace_id == workspace.workspace_id,
868
                )
869
870
            # Search into parent folder (if already deep)
871
            if folder:
872
                folder_query = folder_query\
873
                    .filter(
874
                        Content.parent_id == folder.content_id,
875
                    )
876
            else:
877
                folder_query = folder_query \
878
                    .filter(Content.parent_id == None)
879
880
            # Get thirst corresponding folder
881
            folder = folder_query \
882
                .order_by(Content.revision_id.desc()) \
883
                .one()
884
885
        return folder
886
887
    # TODO - G.M - 2018-09-04 - [Cleanup] Is this method already needed ?
888
    def filter_query_for_content_label_as_path(
889
            self,
890
            query: Query,
891
            content_label_as_file: str,
892
            is_case_sensitive: bool = False,
893
    ) -> Query:
894
        """
895
        Apply normalised filters to found Content corresponding as given label.
896
        :param query: query to modify
897
        :param content_label_as_file: label in this
898
        FILE version, use Content.file_name.
899
        :param is_case_sensitive: Take care about case or not
900
        :return: modified query
901
        """
902
        file_name, file_extension = os.path.splitext(content_label_as_file)
903
904
        label_filter = Content.label == content_label_as_file
905
        file_name_filter = Content.label == file_name
906
        file_extension_filter = Content.file_extension == file_extension
907
908
        if not is_case_sensitive:
909
            label_filter = func.lower(Content.label) == \
910
                           func.lower(content_label_as_file)
911
            file_name_filter = func.lower(Content.label) == \
912
                               func.lower(file_name)
913
            file_extension_filter = func.lower(Content.file_extension) == \
914
                                    func.lower(file_extension)
915
916
        # TODO - G.M - 2018-09-04 - If this method is needed, it should be
917
        # rewritten in order to avoid content_type hardcoded code there
918
        return query.filter(or_(
919
            and_(
920
                Content.type == content_type_list.File.slug,
921
                file_name_filter,
922
                file_extension_filter,
923
            ),
924
            and_(
925
                Content.type == content_type_list.Thread.slug,
926
                file_name_filter,
927
                file_extension_filter,
928
            ),
929
            and_(
930
                Content.type == content_type_list.Page.slug,
931
                file_name_filter,
932
                file_extension_filter,
933
            ),
934
            and_(
935
                Content.type == content_type_list.Folder.slug,
936
                label_filter,
937
            ),
938
        ))
939
940
    def get_pdf_preview_path(
941
        self,
942
        content_id: int,
943
        revision_id: int,
944
        page_number: int,
945
        file_extension: str,
946
    ) -> str:
947
        """
948
        Get pdf preview of revision of content
949
        :param content_id: id of content
950
        :param revision_id: id of content revision
951
        :param page_number: page number of the preview, useful for multipage
952
        content
953
        :param file_extension: file extension of the file
954
        :return: preview_path as string
955
        """
956
        file_path = self.get_one_revision_filepath(revision_id)
957
        try:
958
            page_number = preview_manager_page_format(page_number)
959
            if page_number >= self.preview_manager.get_page_nb(file_path, file_ext=file_extension):  # nopep8
960
                raise PageOfPreviewNotFound(
961
                    'page_number {page_number} of content {content_id} does not exist'.format(  # nopep8
962
                        page_number=page_number,
963
                        content_id=content_id
964
                    ),
965
                )
966
            pdf_preview_path = self.preview_manager.get_pdf_preview(
967
                file_path,
968
                page=page_number,
969
                file_ext=file_extension,
970
            )
971
        except UnavailablePreviewType as exc:
972
            raise TracimUnavailablePreviewType() from exc
973
        except UnsupportedMimeType as exc:
974
            raise UnavailablePreview(
975
                'No preview available for content {}, revision {}'.format(content_id, revision_id)  # nopep8
976
            ) from exc
977
        return pdf_preview_path
978
979
    def get_full_pdf_preview_path(self, revision_id: int, file_extension: str) -> str:
980
        """
981
        Get full(multiple page) pdf preview of revision of content
982
        :param revision_id: id of revision
983
                :param file_extension: file extension of the file
984
        :return: path of the full pdf preview of this revision
985
        """
986
        file_path = self.get_one_revision_filepath(revision_id)
987
        try:
988
            pdf_preview_path = self.preview_manager.get_pdf_preview(file_path, file_ext=file_extension)  # nopep8
989
        except UnavailablePreviewType as exc:
990
            raise TracimUnavailablePreviewType() from exc
991
        except UnsupportedMimeType as exc:
992
            raise UnavailablePreview(
993
                'No preview available for revision {}'.format(revision_id)
994
            ) from exc
995
        return pdf_preview_path
996
997
    def get_jpg_preview_allowed_dim(self) -> PreviewAllowedDim:
998
        """
999
        Get jpg preview allowed dimensions and strict bool param.
1000
        """
1001
        return PreviewAllowedDim(
1002
            self._config.PREVIEW_JPG_RESTRICTED_DIMS,
1003
            self._config.PREVIEW_JPG_ALLOWED_DIMS,
1004
        )
1005
1006
    def get_jpg_preview_path(
1007
        self,
1008
        content_id: int,
1009
        revision_id: int,
1010
        page_number: int,
1011
        file_extension: str,
1012
        width: int = None,
1013
        height: int = None,
1014
    ) -> str:
1015
        """
1016
        Get jpg preview of revision of content
1017
        :param content_id: id of content
1018
        :param revision_id: id of content revision
1019
        :param page_number: page number of the preview, useful for multipage
1020
        content
1021
        :param file_extension: file extension of the file
1022
        :param width: width in pixel
1023
        :param height: height in pixel
1024
        :return: preview_path as string
1025
        """
1026
        file_path = self.get_one_revision_filepath(revision_id)
1027
        try:
1028
            page_number = preview_manager_page_format(page_number)
1029
            if page_number >= self.preview_manager.get_page_nb(file_path, file_ext=file_extension):  # nopep8
1030
                raise PageOfPreviewNotFound(
1031
                    'page {page_number} of revision {revision_id} of content {content_id} does not exist'.format(  # nopep8
1032
                        page_number=page_number,
1033
                        revision_id=revision_id,
1034
                        content_id=content_id,
1035
                    ),
1036
                )
1037
            if not width and not height:
1038
                width = self._config.PREVIEW_JPG_ALLOWED_DIMS[0].width
1039
                height = self._config.PREVIEW_JPG_ALLOWED_DIMS[0].height
1040
1041
            allowed_dim = False
1042
            for preview_dim in self._config.PREVIEW_JPG_ALLOWED_DIMS:
1043
                if width == preview_dim.width and height == preview_dim.height:
1044
                    allowed_dim = True
1045
                    break
1046
1047
            if not allowed_dim and self._config.PREVIEW_JPG_RESTRICTED_DIMS:
1048
                raise PreviewDimNotAllowed(
1049
                    'Size {width}x{height} is not allowed for jpeg preview'.format(
1050
                        width=width,
1051
                        height=height,
1052
                    )
1053
                )
1054
            jpg_preview_path = self.preview_manager.get_jpeg_preview(
1055
                file_path,
1056
                page=page_number,
1057
                width=width,
1058
                height=height,
1059
                file_ext=file_extension,
1060
            )
1061
        except UnsupportedMimeType as exc:
1062
            raise UnavailablePreview(
1063
                'No preview available for content {}, revision {}'.format(content_id, revision_id)  # nopep8
1064
            ) from exc
1065
        return jpg_preview_path
1066
1067
    def _get_all_query(
1068
        self,
1069
        parent_ids: typing.List[int] = None,
1070
        content_type_slug: str = content_type_list.Any_SLUG,
1071
        workspace: Workspace = None,
1072
        label: str = None,
1073
        order_by_properties: typing.Optional[typing.List[typing.Union[str, QueryableAttribute]]] = None,  # nopep8
1074
        complete_path_to_id: int = None,
1075
    ) -> Query:
1076
        """
1077
        Extended filter for better "get all data" query
1078
        :param parent_ids: filter by parent_ids
1079
        :param content_type_slug: filter by content_type slug
1080
        :param workspace: filter by workspace
1081
        :param complete_path_to_id: add all parent(root included) of content_id
1082
        given there to parent_ids filter.
1083
        :param order_by_properties: filter by properties can be both string of
1084
        attribute or attribute of Model object from sqlalchemy(preferred way,
1085
        QueryableAttribute object)
1086
        :return: Query object
1087
        """
1088
        order_by_properties = order_by_properties or []  # FDV
1089
        assert not parent_ids or isinstance(parent_ids, list)
1090
        assert content_type_slug is not None
1091
        assert not complete_path_to_id or isinstance(complete_path_to_id, int)
1092
        resultset = self._base_query(workspace)
1093
1094
        # INFO - G.M - 2018-11-12 - Get list of all ancestror
1095
        #  of content, workspace root included
1096
        if complete_path_to_id:
1097
            content = self.get_one(
1098
                complete_path_to_id,
1099
                content_type_list.Any_SLUG,
1100
                ignore_content_state_filter=True
1101
            )
1102
            if content.parent_id:
1103
                content = content.parent
1104
                while content.parent_id:
1105
                    parent_ids.append(content.content_id)
1106
                    content = content.parent
1107
                parent_ids.append(content.content_id)
1108
            # TODO - G.M - 2018-11-12 - add workspace root to
1109
            # parent_ids list when complete_path_to_id is set
1110
            parent_ids.append(0)
1111
1112
        if content_type_slug != content_type_list.Any_SLUG:
1113
            # INFO - G.M - 2018-07-05 - convert with
1114
            #  content type object to support legacy slug
1115
            content_type_object = content_type_list.get_one_by_slug(content_type_slug)
1116
            all_slug_alias = [content_type_object.slug]
1117
            if content_type_object.slug_alias:
1118
                all_slug_alias.extend(content_type_object.slug_alias)
1119
            resultset = resultset.filter(Content.type.in_(all_slug_alias))
1120
1121
        if parent_ids is False:
1122
            resultset = resultset.filter(Content.parent_id == None)
1123
1124
        if parent_ids:
1125
            # TODO - G.M - 2018-11-09 - Adapt list in order to deal with root
1126
            # case properly
1127
            allowed_parent_ids = []
1128
            allow_root = False
1129
            for parent_id in parent_ids:
1130
                if parent_id == 0:
1131
                    allow_root = True
1132
                else:
1133
                    allowed_parent_ids.append(parent_id)
1134
            if allow_root:
1135
                resultset = resultset.filter(or_(Content.parent_id.in_(allowed_parent_ids), Content.parent_id == None))  # nopep8
1136
            else:
1137
                resultset = resultset.filter(Content.parent_id.in_(allowed_parent_ids))  # nopep8
1138
        if label:
1139
            resultset = resultset.filter(Content.label.ilike('%{}%'.format(label)))  # nopep8
1140
1141
        for _property in order_by_properties:
1142
            resultset = resultset.order_by(_property)
1143
1144
        return resultset
1145
1146
    def get_all(
1147
            self,
1148
            parent_ids: typing.List[int]=None,
1149
            content_type: str=content_type_list.Any_SLUG,
1150
            workspace: Workspace=None,
1151
            label: str=None,
1152
            order_by_properties: typing.Optional[typing.List[typing.Union[str, QueryableAttribute]]] = None,  # nopep8
1153
            complete_path_to_id: int = None,
1154
    ) -> typing.List[Content]:
1155
        """
1156
        Return all content using some filters
1157
        :param parent_ids: filter by parent_id
1158
        :param complete_path_to_id: filter by path of content_id
1159
        (add all parent, root included to parent_ids filter)
1160
        :param content_type: filter by content_type slug
1161
        :param workspace: filter by workspace
1162
        :param order_by_properties: filter by properties can be both string of
1163
        attribute or attribute of Model object from sqlalchemy(preferred way,
1164
        QueryableAttribute object)
1165
        :return: List of contents
1166
        """
1167
        order_by_properties = order_by_properties or []  # FDV
1168
        return self._get_all_query(parent_ids, content_type, workspace, label, order_by_properties, complete_path_to_id).all()
1169
1170
    # TODO - G.M - 2018-07-17 - [Cleanup] Drop this method if unneeded
1171
    # def get_children(self, parent_id: int, content_types: list, workspace: Workspace=None) -> typing.List[Content]:
1172
    #     """
1173
    #     Return parent_id childs of given content_types
1174
    #     :param parent_id: parent id
1175
    #     :param content_types: list of types
1176
    #     :param workspace: workspace filter
1177
    #     :return: list of content
1178
    #     """
1179
    #     resultset = self._base_query(workspace)
1180
    #     resultset = resultset.filter(Content.type.in_(content_types))
1181
    #
1182
    #     if parent_id:
1183
    #         resultset = resultset.filter(Content.parent_id==parent_id)
1184
    #     if parent_id is False:
1185
    #         resultset = resultset.filter(Content.parent_id == None)
1186
    #
1187
    #     return resultset.all()
1188
1189
    # TODO - G.M - 2018-07-17 - [Cleanup] Drop this method if unneeded
1190
    # TODO find an other name to filter on is_deleted / is_archived
1191
    def get_all_with_filter(self, parent_id: int=None, content_type: str=content_type_list.Any_SLUG, workspace: Workspace=None) -> typing.List[Content]:
1192
        assert parent_id is None or isinstance(parent_id, int) # DYN_REMOVE
1193
        assert content_type is not None# DYN_REMOVE
1194
        assert isinstance(content_type, str) # DYN_REMOVE
1195
1196
        resultset = self._base_query(workspace)
1197
1198
        if content_type != content_type_list.Any_SLUG:
1199
            resultset = resultset.filter(Content.type==content_type)
1200
1201
        resultset = resultset.filter(Content.is_deleted == self._show_deleted)
1202
        resultset = resultset.filter(Content.is_archived == self._show_archived)
1203
        resultset = resultset.filter(Content.is_temporary == self._show_temporary)
1204
1205
        resultset = resultset.filter(Content.parent_id==parent_id)
1206
1207
        return resultset.all()
1208
1209
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1210
    def get_all_without_exception(self, content_type: str, workspace: Workspace=None) -> typing.List[Content]:
1211
        assert content_type is not None# DYN_REMOVE
1212
1213
        resultset = self._base_query(workspace)
1214
1215
        if content_type != content_type_list.Any_SLUG:
1216
            resultset = resultset.filter(Content.type==content_type)
1217
1218
        return resultset.all()
1219
1220
    def get_last_unread(self, parent_id: int, content_type: str,
1221
                        workspace: Workspace=None, limit=10) -> typing.List[Content]:
1222
        assert parent_id is None or isinstance(parent_id, int) # DYN_REMOVE
1223
        assert content_type is not None# DYN_REMOVE
1224
        assert isinstance(content_type, str) # DYN_REMOVE
1225
1226
        read_revision_ids = self._session.query(RevisionReadStatus.revision_id) \
1227
            .filter(RevisionReadStatus.user_id==self._user_id)
1228
1229
        not_read_revisions = self._revisions_base_query(workspace) \
1230
            .filter(~ContentRevisionRO.revision_id.in_(read_revision_ids)) \
1231
            .filter(ContentRevisionRO.workspace_id == Workspace.workspace_id) \
1232
            .filter(Workspace.is_deleted.is_(False)) \
1233
            .subquery()
1234
1235
    # TODO - G.M - 2018-07-17 - [Cleanup] Drop this method if unneeded
1236
    # def get_all_without_exception(self, content_type: str, workspace: Workspace=None) -> typing.List[Content]:
1237
    #     assert content_type is not None# DYN_REMOVE
1238
    #
1239
    #     resultset = self._base_query(workspace)
1240
    #
1241
    #     if content_type != content_type_list.Any_SLUG:
1242
    #         resultset = resultset.filter(Content.type==content_type)
1243
    #
1244
    #     return resultset.all()
1245
1246
    def get_last_active(
1247
            self,
1248
            workspace: Workspace=None,
1249
            limit: typing.Optional[int]=None,
1250
            before_content: typing.Optional[Content]= None,
1251
            content_ids: typing.Optional[typing.List[int]] = None,
1252
    ) -> typing.List[Content]:
1253
        """
1254
        get contents list sorted by last update
1255
        (last modification of content itself or one of this comment)
1256
        :param workspace: Workspace to check
1257
        :param limit: maximum number of elements to return
1258
        :param before_content: last_active content are only those updated
1259
         before this content given.
1260
        :param content_ids: restrict selection to some content ids and
1261
        related Comments
1262
        :return: list of content
1263
        """
1264
1265
        resultset = self._get_all_query(
1266
            workspace=workspace,
1267
        )
1268
        if content_ids:
1269
            resultset = resultset.filter(
1270
                or_(
1271
                    Content.content_id.in_(content_ids),
1272
                    and_(
1273
                        Content.parent_id.in_(content_ids),
1274
                        Content.type == content_type_list.Comment.slug
1275
                    )
1276
                )
1277
            )
1278
1279
        resultset = resultset.order_by(desc(ContentRevisionRO.updated), desc(ContentRevisionRO.revision_id), desc(ContentRevisionRO.content_id))
1280
1281
        active_contents = []
1282
        too_recent_content = []
1283
        before_content_find = False
1284
        for content in resultset:
1285
            related_active_content = None
1286
            if content_type_list.Comment.slug == content.type:
1287
                related_active_content = content.parent
1288
            else:
1289
                related_active_content = content
1290
1291
            # INFO - G.M - 2018-08-10 - re-apply general filters here to avoid
1292
            # issue with comments
1293
            if not self._show_deleted and related_active_content.is_deleted:
1294
                continue
1295
            if not self._show_archived and related_active_content.is_archived:
1296
                continue
1297
1298
            if related_active_content not in active_contents and related_active_content not in too_recent_content:  # nopep8
1299
1300
                if not before_content or before_content_find:
1301
                    active_contents.append(related_active_content)
1302
                else:
1303
                    too_recent_content.append(related_active_content)
1304
1305
                if before_content and related_active_content == before_content:
1306
                    before_content_find = True
1307
1308
            if limit and len(active_contents) >= limit:
1309
                break
1310
1311
        return active_contents
1312
1313
    # TODO - G.M - 2018-07-19 - Find a way to update this method to something
1314
    # usable and efficient for tracim v2 to get content with read/unread status
1315
    # instead of relying on has_new_information_for()
1316
    # def get_last_unread(self, parent_id: typing.Optional[int], content_type: str,
1317
    #                     workspace: Workspace=None, limit=10) -> typing.List[Content]:
1318
    #     assert parent_id is None or isinstance(parent_id, int) # DYN_REMOVE
1319
    #     assert content_type is not None# DYN_REMOVE
1320
    #     assert isinstance(content_type, str) # DYN_REMOVE
1321
    #
1322
    #     read_revision_ids = self._session.query(RevisionReadStatus.revision_id) \
1323
    #         .filter(RevisionReadStatus.user_id==self._user_id)
1324
    #
1325
    #     not_read_revisions = self._revisions_base_query(workspace) \
1326
    #         .filter(~ContentRevisionRO.revision_id.in_(read_revision_ids)) \
1327
    #         .filter(ContentRevisionRO.workspace_id == Workspace.workspace_id) \
1328
    #         .filter(Workspace.is_deleted.is_(False)) \
1329
    #         .subquery()
1330
    #
1331
    #     not_read_content_ids_query = self._session.query(
1332
    #         distinct(not_read_revisions.c.content_id)
1333
    #     )
1334
    #     not_read_content_ids = list(map(
1335
    #         itemgetter(0),
1336
    #         not_read_content_ids_query,
1337
    #     ))
1338
    #
1339
    #     not_read_contents = self._base_query(workspace) \
1340
    #         .filter(Content.content_id.in_(not_read_content_ids)) \
1341
    #         .order_by(desc(Content.updated))
1342
    #
1343
    #     if content_type != content_type_list.Any_SLUG:
1344
    #         not_read_contents = not_read_contents.filter(
1345
    #             Content.type==content_type)
1346
    #     else:
1347
    #         not_read_contents = not_read_contents.filter(
1348
    #             Content.type!=content_type_list.Folder.slug)
1349
    #
1350
    #     if parent_id:
1351
    #         not_read_contents = not_read_contents.filter(
1352
    #             Content.parent_id==parent_id)
1353
    #
1354
    #     result = []
1355
    #     for item in not_read_contents:
1356
    #         new_item = None
1357
    #         if content_type_list.Comment.slug == item.type:
1358
    #             new_item = item.parent
1359
    #         else:
1360
    #             new_item = item
1361
    #
1362
    #         # INFO - D.A. - 2015-05-20
1363
    #         # We do not want to show only one item if the last 10 items are
1364
    #         # comments about one thread for example
1365
    #         if new_item not in result:
1366
    #             result.append(new_item)
1367
    #
1368
    #         if len(result) >= limit:
1369
    #             break
1370
    #
1371
    #     return result
1372
1373
    def _set_allowed_content(self, content: Content, allowed_content_dict: dict) -> None:  # nopep8
1374
        """
1375
        :param content: the given content instance
1376
        :param allowed_content_dict: must be something like this:
1377
            dict(
1378
                folder = True
1379
                thread = True,
1380
                file = False,
1381
                page = True
1382
            )
1383
        :return: nothing
1384
        """
1385
        properties = content.properties.copy()
1386
        properties['allowed_content'] = allowed_content_dict
1387
        content.properties = properties
1388
1389
    def set_allowed_content(self, content: Content, allowed_content_type_slug_list: typing.List[str]) -> None:  # nopep8
1390
        """
1391
        :param content: the given content instance
1392
        :param allowed_content_type_slug_list: list of content_type_slug to
1393
        accept as subcontent.
1394
        :return: nothing
1395
        """
1396
        allowed_content_dict = {}
1397
        for allowed_content_type_slug in allowed_content_type_slug_list:
1398
            if allowed_content_type_slug not in content_type_list.endpoint_allowed_types_slug():
1399
                raise ContentTypeNotExist('Content_type {} does not exist'.format(allowed_content_type_slug))  # nopep8
1400
            allowed_content_dict[allowed_content_type_slug] = True
1401
1402
        self._set_allowed_content(content, allowed_content_dict)
1403
1404
    def restore_content_default_allowed_content(self, content: Content) -> None:
1405
        """
1406
        Return to default allowed_content_types
1407
        :param content: the given content instance
1408
        :return: nothing
1409
        """
1410
        if content._properties and 'allowed_content' in content._properties:
1411
            properties = content.properties.copy()
1412
            del properties['allowed_content']
1413
            content.properties = properties
1414
1415
    def set_status(self, content: Content, new_status: str):
1416
        if new_status in content_status_list.get_all_slugs_values():
1417
            content.status = new_status
1418
            content.revision_type = ActionDescription.STATUS_UPDATE
1419
        else:
1420
            raise ValueError('The given value {} is not allowed'.format(new_status))
1421
1422
    def move(self,
1423
             item: Content,
1424
             new_parent: Content,
1425
             must_stay_in_same_workspace: bool=True,
1426
             new_workspace: Workspace=None,
1427
    ):
1428
        if must_stay_in_same_workspace:
1429
            if new_parent and new_parent.workspace_id != item.workspace_id:
1430
                raise ValueError('the item should stay in the same workspace')
1431
1432
        item.parent = new_parent
1433
        if new_workspace:
1434
            item.workspace = new_workspace
1435
            if new_parent and \
1436
                    new_parent.workspace_id != new_workspace.workspace_id:
1437
                raise WorkspacesDoNotMatch(
1438
                    'new parent workspace and new workspace should be the same.'
1439
                )
1440
        else:
1441
            if new_parent:
1442
                item.workspace = new_parent.workspace
1443
1444
        self._is_filename_available_or_raise(
1445
            item.file_name,
1446
            item.workspace,
1447
            item.parent,
1448
            exclude_content_id=item.content_id
1449
        )
1450
        item.revision_type = ActionDescription.MOVE
1451
1452
    def copy(
1453
        self,
1454
        item: Content,
1455
        new_parent: Content=None,
1456
        new_label: str=None,
1457
        do_save: bool=True,
1458
        do_notify: bool=True,
1459
    ) -> Content:
1460
        """
1461
        Copy nearly all content, revision included. Children not included, see
1462
        "copy_children" for this.
1463
        :param item: Item to copy
1464
        :param new_parent: new parent of the new copied item
1465
        :param new_label: new label of the new copied item
1466
        :param do_notify: notify copy or not
1467
        :return: Newly copied item
1468
        """
1469
        if (not new_parent and not new_label) or (new_parent == item.parent and new_label == item.label):  # nopep8
1470
            # TODO - G.M - 08-03-2018 - Use something else than value error
1471
            raise ValueError("You can't copy file into itself")
1472
        if new_parent:
1473
            workspace = new_parent.workspace
1474
            parent = new_parent
1475
        else:
1476
            workspace = item.workspace
1477
            parent = item.parent
1478
        label = new_label or item.label
1479
        filename = self._prepare_filename(label, item.file_extension)
1480
        self._is_filename_available_or_raise(filename, workspace, parent)
1481
        content = item.copy(parent)
1482
        # INFO - GM - 15-03-2018 - add "copy" revision
1483
        with new_revision(
1484
            session=self._session,
1485
            tm=transaction.manager,
1486
            content=content,
1487
            force_create_new_revision=True
1488
        ) as rev:
1489
            rev.parent = parent
1490
            rev.workspace = workspace
1491
            rev.file_name = filename
1492
            rev.revision_type = ActionDescription.COPY
1493
            rev.properties['origin'] = {
1494
                'content': item.id,
1495
                'revision': item.last_revision.revision_id,
1496
            }
1497
        if do_save:
1498
            self.save(content, ActionDescription.COPY, do_notify=do_notify)
1499
        return content
1500
1501
    def copy_children(self, origin_content: Content, new_content: Content):
1502
        for child in origin_content.children:
1503
            self.copy(child, new_content)
1504
1505
    def move_recursively(self, item: Content,
1506
                         new_parent: Content, new_workspace: Workspace):
1507
        self.move(item, new_parent, False, new_workspace)
1508
        self.save(item, do_notify=False)
1509
1510
        for child in item.children:
1511
            with new_revision(
1512
                session=self._session,
1513
                tm=transaction.manager,
1514
                content=child
1515
            ):
1516
                self.move_recursively(child, item, new_workspace)
1517
        return
1518
1519
    def is_editable(self, item: Content) -> bool:
1520
        return not item.is_readonly \
1521
               and item.is_active \
1522
               and item.get_status().is_editable()
1523
1524
    def update_content(self, item: Content, new_label: str, new_content: str=None) -> Content:
1525
        if not self.is_editable(item):
1526
            raise ContentInNotEditableState("Can't update not editable file, you need to change his status or state (deleted/archived) before any change.")  # nopep8
1527
        if item.label == new_label and item.description == new_content:
1528
            # TODO - G.M - 20-03-2018 - Fix internatization for webdav access.
1529
            # Internatization disabled in libcontent for now.
1530
            raise SameValueError('The content did not changed')
1531
        if not new_label:
1532
            raise EmptyLabelNotAllowed()
1533
1534
        label = new_label or item.label
1535
        filename = self._prepare_filename(label, item.file_extension)
1536
        self._is_filename_available_or_raise(
1537
            filename,
1538
            item.workspace,
1539
            item.parent,
1540
            exclude_content_id=item.content_id
1541
        )
1542
1543
        item.owner = self._user
1544
        item.label = new_label
1545
        item.description = new_content if new_content else item.description # TODO: convert urls into links
1546
        item.revision_type = ActionDescription.EDITION
1547
        return item
1548
1549
    def update_file_data(self, item: Content, new_filename: str, new_mimetype: str, new_content: bytes) -> Content:
1550
        if not self.is_editable(item):
1551
            raise ContentInNotEditableState("Can't update not editable file, you need to change his status or state (deleted/archived) before any change.")  # nopep8
1552
        # FIXME - G.M - 2018-09-25 - Repair and do a better same content check,
1553
        # as pyramid behaviour use buffered object
1554
        # new_content == item.depot_file.file.read() case cannot happened using
1555
        # whenever new_content.read() == item.depot_file.file.read().
1556
        # as this behaviour can create struggle with big file, simple solution
1557
        # using read can be used everytime.
1558
        # if new_mimetype == item.file_mimetype and \
1559
        #         new_content == item.depot_file.file.read():
1560
        #     raise SameValueError('The content did not changed')
1561
        item.owner = self._user
1562
        self._is_filename_available_or_raise(
1563
            new_filename,
1564
            item.workspace,
1565
            item.parent,
1566
            exclude_content_id=item.content_id
1567
        )
1568
        item.file_name = new_filename
1569
        item.file_mimetype = new_mimetype
1570
        item.depot_file = FileIntent(
1571
            new_content,
1572
            new_filename,
1573
            new_mimetype,
1574
        )
1575
        item.revision_type = ActionDescription.REVISION
1576
        return item
1577
1578
    def archive(self, content: Content):
1579
        content.owner = self._user
1580
        content.is_archived = True
1581
        # TODO - G.M - 12-03-2018 - Inspect possible label conflict problem
1582
        # INFO - G.M - 12-03-2018 - Set label name to avoid trouble when
1583
        # un-archiving file.
1584
        label = '{label}-{action}-{date}'.format(
1585
            label=content.label,
1586
            action='archived',
1587
            date=current_date_for_filename()
1588
        )
1589
        filename = self._prepare_filename(label, content.file_extension)
1590
        self._is_filename_available_or_raise(
1591
            filename,
1592
            content.workspace,
1593
            content.parent,
1594
            exclude_content_id=content.content_id
1595
        )
1596
        content.file_name = filename
1597
        content.revision_type = ActionDescription.ARCHIVING
1598
1599
    def unarchive(self, content: Content):
1600
        content.owner = self._user
1601
        content.is_archived = False
1602
        content.revision_type = ActionDescription.UNARCHIVING
1603
1604
    def delete(self, content: Content):
1605
        content.owner = self._user
1606
        content.is_deleted = True
1607
        # TODO - G.M - 12-03-2018 - Inspect possible label conflict problem
1608
        # INFO - G.M - 12-03-2018 - Set label name to avoid trouble when
1609
        # un-deleting file.
1610
        label = '{label}-{action}-{date}'.format(
1611
            label=content.label,
1612
            action='deleted',
1613
            date=current_date_for_filename()
1614
        )
1615
        filename = self._prepare_filename(label, content.file_extension)
1616
        self._is_filename_available_or_raise(
1617
            filename,
1618
            content.workspace,
1619
            content.parent,
1620
            exclude_content_id=content.content_id
1621
        )
1622
        content.file_name = filename
1623
        content.revision_type = ActionDescription.DELETION
1624
1625
    def undelete(self, content: Content):
1626
        content.owner = self._user
1627
        content.is_deleted = False
1628
        content.revision_type = ActionDescription.UNDELETION
1629
1630
    def get_preview_page_nb(self, revision_id: int, file_extension: str) -> typing.Optional[int]:  # nopep8
1631
        file_path = self.get_one_revision_filepath(revision_id)
1632
        try:
1633
            nb_pages = self.preview_manager.get_page_nb(
1634
                file_path,
1635
                file_ext=file_extension
1636
            )
1637
        except UnsupportedMimeType:
1638
            return None
1639
        return nb_pages
1640
1641
    def has_pdf_preview(self, revision_id: int, file_extension: str) -> bool:
1642
        file_path = self.get_one_revision_filepath(revision_id)
1643
        try:
1644
            has_preview = self.preview_manager.has_pdf_preview(
1645
                file_path,
1646
                file_ext=file_extension
1647
            )
1648
        except UnsupportedMimeType:
1649
            has_preview = False
1650
        return has_preview
1651
1652
    def has_jpeg_preview(self, revision_id: int, file_extension: str) -> bool:
1653
        file_path = self.get_one_revision_filepath(revision_id)
1654
        try:
1655
            has_preview = self.preview_manager.has_jpeg_preview(
1656
                file_path,
1657
                file_ext=file_extension
1658
            )
1659
        except UnsupportedMimeType:
1660
            has_preview = False
1661
        return has_preview
1662
1663
    def mark_read__all(
1664
            self,
1665
            read_datetime: datetime=None,
1666
            do_flush: bool=True,
1667
            recursive: bool=True
1668
    ) -> None:
1669
        """
1670
        Read content of all workspace visible for the user.
1671
        :param read_datetime: date of readigin
1672
        :param do_flush: flush database
1673
        :param recursive: mark read subcontent too
1674
        :return: nothing
1675
        """
1676
        return self.mark_read__workspace(None, read_datetime, do_flush, recursive) # nopep8
1677
1678
    def mark_read__workspace(self,
1679
                       workspace : Workspace,
1680
                       read_datetime: datetime=None,
1681
                       do_flush: bool=True,
1682
                       recursive: bool=True
1683
    ) -> None:
1684
        """
1685
        Read content of a workspace visible for the user.
1686
        :param read_datetime: date of readigin
1687
        :param do_flush: flush database
1688
        :param recursive: mark read subcontent too
1689
        :return: nothing
1690
        """
1691
        itemset = self.get_last_active(workspace)
1692
        for item in itemset:
1693
            if item.has_new_information_for(self._user):
1694
                self.mark_read(item, read_datetime, do_flush, recursive)
1695
1696
    def mark_read(
1697
            self,
1698
            content: Content,
1699
            read_datetime: datetime=None,
1700
            do_flush: bool=True,
1701
            recursive: bool=True
1702
    ) -> Content:
1703
        """
1704
        Read content for the user.
1705
        :param read_datetime: date of readigin
1706
        :param do_flush: flush database
1707
        :param recursive: mark read subcontent too
1708
        :return: nothing
1709
        """
1710
        assert self._user
1711
        assert content
1712
1713
        # The algorithm is:
1714
        # 1. define the read datetime
1715
        # 2. update all revisions related to current Content
1716
        # 3. do the same for all child revisions
1717
        #    (ie parent_id is content_id of current content)
1718
1719
        if not read_datetime:
1720
            read_datetime = datetime.datetime.now()
1721
1722
        viewed_revisions = self._session.query(ContentRevisionRO) \
1723
            .filter(ContentRevisionRO.content_id==content.content_id).all()
1724
1725
        for revision in viewed_revisions:
1726
            revision.read_by[self._user] = read_datetime
1727
1728
        if recursive:
1729
            # mark read :
1730
            # - all children
1731
            # - parent stuff (if you mark a comment as read,
1732
            #                 then you have seen the parent)
1733
            # - parent comments
1734
            for child in content.get_valid_children():
1735
                self.mark_read(child, read_datetime=read_datetime,
1736
                               do_flush=False)
1737
1738
            if content_type_list.Comment.slug == content.type:
1739
                self.mark_read(content.parent, read_datetime=read_datetime,
1740
                               do_flush=False, recursive=False)
1741
                for comment in content.parent.get_comments():
1742
                    if comment != content:
1743
                        self.mark_read(comment, read_datetime=read_datetime,
1744
                                       do_flush=False, recursive=False)
1745
1746
        if do_flush:
1747
            self.flush()
1748
1749
        return content
1750
1751
    def mark_unread(self, content: Content, do_flush=True) -> Content:
1752
        assert self._user
1753
        assert content
1754
1755
        revisions = self._session.query(ContentRevisionRO) \
1756
            .filter(ContentRevisionRO.content_id==content.content_id).all()
1757
1758
        for revision in revisions:
1759
            try:
1760
                del revision.read_by[self._user]
1761
            except KeyError:
1762
                pass
1763
1764
        for child in content.get_valid_children():
1765
            self.mark_unread(child, do_flush=False)
1766
1767
        if do_flush:
1768
            self.flush()
1769
1770
        return content
1771
1772
    def flush(self):
1773
        self._session.flush()
1774
1775
    def save(self, content: Content, action_description: str=None, do_flush=True, do_notify=True):
1776
        """
1777
        Save an object, flush the session and set the revision_type property
1778
        :param content:
1779
        :param action_description:
1780
        :return:
1781
        """
1782
        assert action_description is None or action_description in ActionDescription.allowed_values()
1783
1784
        if not action_description:
1785
            # See if the last action has been modified
1786
            if content.revision_type==None or len(get_history(content.revision, 'revision_type'))<=0:
1787
                # The action has not been modified, so we set it to default edition
1788
                action_description = ActionDescription.EDITION
1789
1790
        if action_description:
1791
            content.revision_type = action_description
1792
1793
        if do_flush:
1794
            # INFO - 2015-09-03 - D.A.
1795
            # There are 2 flush because of the use
1796
            # of triggers for content creation
1797
            #
1798
            # (when creating a content, actually this is an insert of a new
1799
            # revision in content_revisions ; so the mark_read operation need
1800
            # to get full real data from database before to be prepared.
1801
1802
            self._session.add(content)
1803
            self._session.flush()
1804
1805
            # TODO - 2015-09-03 - D.A. - Do not use triggers
1806
            # We should create a new ContentRevisionRO object instead of Content
1807
            # This would help managing view/not viewed status
1808
            self.mark_read(content, do_flush=True)
1809
1810
        if do_notify:
1811
            self.do_notify(content)
1812
1813
    def do_notify(self, content: Content):
1814
        """
1815
        Allow to force notification for a given content. By default, it is
1816
        called during the .save() operation
1817
        :param content:
1818
        :return:
1819
        """
1820
        NotifierFactory.create(
1821
            config=self._config,
1822
            current_user=self._user,
1823
            session=self._session,
1824
        ).notify_content_update(content)
1825
1826
    def get_keywords(self, search_string, search_string_separators=None) -> [str]:
1827
        """
1828
        :param search_string: a list of coma-separated keywords
1829
        :return: a list of str (each keyword = 1 entry
1830
        """
1831
1832
        search_string_separators = search_string_separators or ContentApi.SEARCH_SEPARATORS
1833
1834
        keywords = []
1835
        if search_string:
1836
            keywords = [keyword.strip() for keyword in re.split(search_string_separators, search_string)]
1837
1838
        return keywords
1839
1840
    def search(self, keywords: [str]) -> Query:
1841
        """
1842
        :return: a sorted list of Content items
1843
        """
1844
1845
        if len(keywords)<=0:
1846
            return None
1847
1848
        filter_group_label = list(Content.label.ilike('%{}%'.format(keyword)) for keyword in keywords)
1849
        filter_group_desc = list(Content.description.ilike('%{}%'.format(keyword)) for keyword in keywords)
1850
        title_keyworded_items = self._hard_filtered_base_query().\
1851
            filter(or_(*(filter_group_label+filter_group_desc))).\
1852
            options(joinedload('children_revisions')).\
1853
            options(joinedload('parent'))
1854
1855
        return title_keyworded_items
1856
1857
    def get_all_types(self) -> typing.List[ContentType]:
1858
        labels = content_type_list.endpoint_allowed_types_slug()
1859
        content_types = []
1860
        for label in labels:
1861
            content_types.append(content_type_list.get_one_by_slug(label))
1862
1863
        return content_types
1864
1865
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1866
    def exclude_unavailable(
1867
        self,
1868
        contents: typing.List[Content],
1869
    ) -> typing.List[Content]:
1870
        """
1871
        Update and return list with content under archived/deleted removed.
1872
        :param contents: List of contents to parse
1873
        """
1874
        for content in contents[:]:
1875
            if self.content_under_deleted(content) or self.content_under_archived(content):
1876
                contents.remove(content)
1877
        return contents
1878
1879
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1880
    def content_under_deleted(self, content: Content) -> bool:
1881
        if content.parent:
1882
            if content.parent.is_deleted:
1883
                return True
1884
            if content.parent.parent:
1885
                return self.content_under_deleted(content.parent)
1886
        return False
1887
1888
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1889
    def content_under_archived(self, content: Content) -> bool:
1890
        if content.parent:
1891
            if content.parent.is_archived:
1892
                return True
1893
            if content.parent.parent:
1894
                return self.content_under_archived(content.parent)
1895
        return False
1896
1897
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1898
    def find_one_by_unique_property(
1899
            self,
1900
            property_name: str,
1901
            property_value: str,
1902
            workspace: Workspace=None,
1903
    ) -> Content:
1904
        """
1905
        Return Content who contains given property.
1906
        Raise sqlalchemy.orm.exc.MultipleResultsFound if more than one Content
1907
        contains this property value.
1908
        :param property_name: Name of property
1909
        :param property_value: Value of property
1910
        :param workspace: Workspace who contains Content
1911
        :return: Found Content
1912
        """
1913
        # TODO - 20160602 - Bastien: Should be JSON type query
1914
        # see https://www.compose.io/articles/using-json-extensions-in-\
1915
        # postgresql-from-python-2/
1916
        query = self._base_query(workspace=workspace).filter(
1917
            Content._properties.like(
1918
                '%"{property_name}": "{property_value}"%'.format(
1919
                    property_name=property_name,
1920
                    property_value=property_value,
1921
                )
1922
            )
1923
        )
1924
        return query.one()
1925
1926
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1927
    def generate_folder_label(
1928
            self,
1929
            workspace: Workspace,
1930
            parent: Content=None,
1931
    ) -> str:
1932
        """
1933
        Generate a folder label
1934
        :param workspace: Future folder workspace
1935
        :param parent: Parent of foture folder (can be None)
1936
        :return: Generated folder name
1937
        """
1938
        _ = self.translator.get_translation
1939
        query = self._base_query(workspace=workspace)\
1940
            .filter(Content.label.ilike('{0}%'.format(
1941
                _('New folder'),
1942
            )))
1943
        if parent:
1944
            query = query.filter(Content.parent == parent)
1945
1946
        return _('New folder {0}').format(
1947
            query.count() + 1,
1948
        )
1949