ContentApi.get_last_active()   F
last analyzed

Complexity

Conditions 16

Size

Total Lines 66
Code Lines 37

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 37
dl 0
loc 66
rs 2.4
c 0
b 0
f 0
cc 16
nop 5

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like backend.tracim_backend.lib.core.content.ContentApi.get_last_active() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
import datetime
3
import os
4
import re
5
import traceback
6
import typing
7
from contextlib import contextmanager
8
9
import sqlalchemy
10
import transaction
11
from depot.io.utils import FileIntent
12
from depot.manager import DepotManager
13
from preview_generator.exception import UnavailablePreviewType
14
from preview_generator.exception import UnsupportedMimeType
15
from preview_generator.manager import PreviewManager
16
from sqlalchemy import desc
17
from sqlalchemy import func
18
from sqlalchemy import or_
19
from sqlalchemy.orm import Query
20
from sqlalchemy.orm import aliased
21
from sqlalchemy.orm import joinedload
22
from sqlalchemy.orm.attributes import QueryableAttribute
23
from sqlalchemy.orm.attributes import get_history
24
from sqlalchemy.orm.exc import NoResultFound
25
from sqlalchemy.orm.session import Session
26
from sqlalchemy.sql.elements import and_
27
28
from tracim_backend.app_models.contents import content_status_list
29
from tracim_backend.app_models.contents import content_type_list
30
from tracim_backend.app_models.contents import FOLDER_TYPE
31
from tracim_backend.app_models.contents import ContentStatus
32
from tracim_backend.app_models.contents import ContentType
33
from tracim_backend.app_models.contents import GlobalStatus
34
from tracim_backend.exceptions import ContentInNotEditableState
35
from tracim_backend.exceptions import ContentFilenameAlreadyUsedInFolder
36
from tracim_backend.exceptions import ContentNotFound
37
from tracim_backend.exceptions import ContentTypeNotExist
38
from tracim_backend.exceptions import EmptyCommentContentNotAllowed
39
from tracim_backend.exceptions import EmptyLabelNotAllowed
40
from tracim_backend.exceptions import PageOfPreviewNotFound
41
from tracim_backend.exceptions import PreviewDimNotAllowed
42
from tracim_backend.exceptions import RevisionDoesNotMatchThisContent
43
from tracim_backend.exceptions import SameValueError
44
from tracim_backend.exceptions import TracimUnavailablePreviewType
45
from tracim_backend.exceptions import UnallowedSubContent
46
from tracim_backend.exceptions import UnavailablePreview
47
from tracim_backend.exceptions import WorkspacesDoNotMatch
48
from tracim_backend.lib.core.notifications import NotifierFactory
49
from tracim_backend.lib.utils.logger import logger
50
from tracim_backend.lib.utils.translation import DEFAULT_FALLBACK_LANG
51
from tracim_backend.lib.utils.translation import Translator
52
from tracim_backend.lib.utils.utils import cmp_to_key
53
from tracim_backend.lib.utils.utils import current_date_for_filename
54
from tracim_backend.lib.utils.utils import preview_manager_page_format
55
from tracim_backend.models.auth import User
56
from tracim_backend.models.context_models import ContentInContext
57
from tracim_backend.models.context_models import PreviewAllowedDim
58
from tracim_backend.models.context_models import RevisionInContext
59
from tracim_backend.models.data import ActionDescription
60
from tracim_backend.models.data import Content
61
from tracim_backend.models.data import ContentRevisionRO
62
from tracim_backend.models.data import NodeTreeItem
63
from tracim_backend.models.data import RevisionReadStatus
64
from tracim_backend.models.data import UserRoleInWorkspace
65
from tracim_backend.models.data import Workspace
66
from tracim_backend.models.revision_protection import new_revision
67
68
__author__ = 'damien'
69
70
71
# TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
72
def compare_content_for_sorting_by_type_and_name(
73
        content1: Content,
74
        content2: Content
75
) -> int:
76
    """
77
    :param content1:
78
    :param content2:
79
    :return:    1 if content1 > content2
80
                -1 if content1 < content2
81
                0 if content1 = content2
82
    """
83
84
    if content1.type == content2.type:
85
        if content1.get_label().lower()>content2.get_label().lower():
86
            return 1
87
        elif content1.get_label().lower()<content2.get_label().lower():
88
            return -1
89
        return 0
90
    else:
91
        # TODO - D.A. - 2014-12-02 - Manage Content Types Dynamically
92
        content_type_order = [
93
            content_type_list.Folder.slug,
94
            content_type_list.Page.slug,
95
            content_type_list.Thread.slug,
96
            content_type_list.File.slug,
97
        ]
98
99
        content_1_type_index = content_type_order.index(content1.type)
100
        content_2_type_index = content_type_order.index(content2.type)
101
        result = content_1_type_index - content_2_type_index
102
103
        if result < 0:
104
            return -1
105
        elif result > 0:
106
            return 1
107
        else:
108
            return 0
109
110
# TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
111
def compare_tree_items_for_sorting_by_type_and_name(
112
        item1: NodeTreeItem,
113
        item2: NodeTreeItem
114
) -> int:
115
    return compare_content_for_sorting_by_type_and_name(item1.node, item2.node)
116
117
118
class ContentApi(object):
119
120
    SEARCH_SEPARATORS = ',| '
121
    SEARCH_DEFAULT_RESULT_NB = 50
122
123
    # DISPLAYABLE_CONTENTS = (
124
    #     content_type_list.Folder.slug,
125
    #     content_type_list.File.slug,
126
    #     content_type_list.Comment.slug,
127
    #     content_type_list.Thread.slug,
128
    #     content_type_list.Page.slug,
129
    #     content_type_list.Page.slugLegacy,
130
    #     ContentType.MarkdownPage,
131
    # )
132
133
    def __init__(
134
            self,
135
            session: Session,
136
            current_user: typing.Optional[User],
137
            config,
138
            show_archived: bool = False,
139
            show_deleted: bool = False,
140
            show_temporary: bool = False,
141
            show_active: bool = True,
142
            all_content_in_treeview: bool = True,
143
            force_show_all_types: bool = False,
144
            disable_user_workspaces_filter: bool = False,
145
    ) -> None:
146
        self._session = session
147
        self._user = current_user
148
        self._config = config
149
        self._user_id = current_user.user_id if current_user else None
150
        self._show_archived = show_archived
151
        self._show_deleted = show_deleted
152
        self._show_temporary = show_temporary
153
        self._show_active = show_active
154
        self._show_all_type_of_contents_in_treeview = all_content_in_treeview
155
        self._force_show_all_types = force_show_all_types
156
        self._disable_user_workspaces_filter = disable_user_workspaces_filter
157
        self.preview_manager = PreviewManager(self._config.PREVIEW_CACHE_DIR, create_folder=True)  # nopep8
158
        default_lang = None
159
        if self._user:
160
            default_lang = self._user.lang
161
        if not default_lang:
162
            default_lang = DEFAULT_FALLBACK_LANG
163
        self.translator = Translator(app_config=self._config, default_lang=default_lang)  # nopep8
164
165
    @contextmanager
166
    def show(
167
            self,
168
            show_archived: bool=False,
169
            show_deleted: bool=False,
170
            show_temporary: bool=False,
171
    ) -> typing.Generator['ContentApi', None, None]:
172
        """
173
        Use this method as context manager to update show_archived,
174
        show_deleted and show_temporary properties during context.
175
        :param show_archived: show archived contents
176
        :param show_deleted:  show deleted contents
177
        :param show_temporary:  show temporary contents
178
        """
179
        previous_show_archived = self._show_archived
180
        previous_show_deleted = self._show_deleted
181
        previous_show_temporary = self._show_temporary
182
183
        try:
184
            self._show_archived = show_archived
185
            self._show_deleted = show_deleted
186
            self._show_temporary = show_temporary
187
            yield self
188
        finally:
189
            self._show_archived = previous_show_archived
190
            self._show_deleted = previous_show_deleted
191
            self._show_temporary = previous_show_temporary
192
193
    def get_content_in_context(self, content: Content) -> ContentInContext:
194
        return ContentInContext(content, self._session, self._config, self._user)  # nopep8
195
196
    def get_revision_in_context(self, revision: ContentRevisionRO) -> RevisionInContext:  # nopep8
197
        # TODO - G.M - 2018-06-173 - create revision in context object
198
        return RevisionInContext(revision, self._session, self._config, self._user) # nopep8
199
    
200
    def _get_revision_join(self) -> sqlalchemy.sql.elements.BooleanClauseList:
201
        """
202
        Return the Content/ContentRevision query join condition
203
        :return: Content/ContentRevision query join condition
204
        """
205
        return and_(Content.id == ContentRevisionRO.content_id,
206
                    ContentRevisionRO.revision_id == self._session.query(
207
                        ContentRevisionRO.revision_id)
208
                    .filter(ContentRevisionRO.content_id == Content.id)
209
                    .order_by(ContentRevisionRO.revision_id.desc())
210
                    .limit(1)
211
                    .correlate(Content))
212
213
    def get_canonical_query(self) -> Query:
214
        """
215
        Return the Content/ContentRevision base query who join these table on the last revision.
216
        :return: Content/ContentRevision Query
217
        """
218
        return self._session.query(Content)\
219
            .join(ContentRevisionRO, self._get_revision_join())
220
221
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
222
    @classmethod
223
    def sort_tree_items(
224
        cls,
225
        content_list: typing.List[NodeTreeItem],
226
    )-> typing.List[NodeTreeItem]:
227
        news = []
228
        for item in content_list:
229
            news.append(item)
230
231
        content_list.sort(key=cmp_to_key(
232
            compare_tree_items_for_sorting_by_type_and_name,
233
        ))
234
235
        return content_list
236
237
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
238
    @classmethod
239
    def sort_content(
240
        cls,
241
        content_list: typing.List[Content],
242
    ) -> typing.List[Content]:
243
        content_list.sort(key=cmp_to_key(compare_content_for_sorting_by_type_and_name))
244
        return content_list
245
246
    def __real_base_query(
247
        self,
248
        workspace: Workspace = None,
249
    ) -> Query:
250
        result = self.get_canonical_query()
251
252
        # Exclude non displayable types
253
        if not self._force_show_all_types:
254
            result = result.filter(Content.type.in_(content_type_list.query_allowed_types_slugs()))
255
256
        if workspace:
257
            result = result.filter(Content.workspace_id == workspace.workspace_id)
258
259
        # Security layer: if user provided, filter
260
        # with user workspaces privileges
261
        if self._user and not self._disable_user_workspaces_filter:
262
            user = self._session.query(User).get(self._user_id)
263
            # Filter according to user workspaces
264
            workspace_ids = [r.workspace_id for r in user.roles \
265
                             if r.role>=UserRoleInWorkspace.READER]
266
            result = result.filter(or_(
267
                Content.workspace_id.in_(workspace_ids),
268
                # And allow access to non workspace document when he is owner
269
                and_(
270
                    Content.workspace_id == None,
271
                    Content.owner_id == self._user_id,
272
                )
273
            ))
274
275
        return result
276
277
    def _base_query(self, workspace: Workspace=None) -> Query:
278
        result = self.__real_base_query(workspace)
279
280
        if not self._show_active:
281
            result = result.filter(or_(
282
                Content.is_deleted==True,
283
                Content.is_archived==True,
284
            ))
285
        if not self._show_deleted:
286
            result = result.filter(Content.is_deleted==False)
287
288
        if not self._show_archived:
289
            result = result.filter(Content.is_archived==False)
290
291
        if not self._show_temporary:
292
            result = result.filter(Content.is_temporary==False)
293
294
        return result
295
296
    def __revisions_real_base_query(
297
        self,
298
        workspace: Workspace=None,
299
    ) -> Query:
300
        result = self._session.query(ContentRevisionRO)
301
302
        # Exclude non displayable types
303
        if not self._force_show_all_types:
304
            result = result.filter(Content.type.in_(self.DISPLAYABLE_CONTENTS))
305
306
        if workspace:
307
            result = result.filter(ContentRevisionRO.workspace_id==workspace.workspace_id)
308
309
        if self._user:
310
            user = self._session.query(User).get(self._user_id)
311
            # Filter according to user workspaces
312
            workspace_ids = [r.workspace_id for r in user.roles \
313
                             if r.role>=UserRoleInWorkspace.READER]
314
            result = result.filter(ContentRevisionRO.workspace_id.in_(workspace_ids))
315
316
        return result
317
318
    def _revisions_base_query(
319
        self,
320
        workspace: Workspace=None,
321
    ) -> Query:
322
        result = self.__revisions_real_base_query(workspace)
323
324
        if not self._show_deleted:
325
            result = result.filter(ContentRevisionRO.is_deleted==False)
326
327
        if not self._show_archived:
328
            result = result.filter(ContentRevisionRO.is_archived==False)
329
330
        if not self._show_temporary:
331
            result = result.filter(Content.is_temporary==False)
332
333
        return result
334
335
    def _hard_filtered_base_query(
336
        self,
337
        workspace: Workspace=None,
338
    ) -> Query:
339
        """
340
        If set to True, then filterign on is_deleted and is_archived will also
341
        filter parent properties. This is required for search() function which
342
        also search in comments (for example) which may be 'not deleted' while
343
        the associated content is deleted
344
345
        :param hard_filtering:
346
        :return:
347
        """
348
        result = self.__real_base_query(workspace)
349
350
        if not self._show_deleted:
351
            parent = aliased(Content)
352
            result = result.join(parent, Content.parent).\
353
                filter(Content.is_deleted==False).\
354
                filter(parent.is_deleted==False)
355
356
        if not self._show_archived:
357
            parent = aliased(Content)
358
            result = result.join(parent, Content.parent).\
359
                filter(Content.is_archived==False).\
360
                filter(parent.is_archived==False)
361
362
        if not self._show_temporary:
363
            parent = aliased(Content)
364
            result = result.join(parent, Content.parent). \
365
                filter(Content.is_temporary == False). \
366
                filter(parent.is_temporary == False)
367
368
        return result
369
370
    def get_base_query(
371
        self,
372
        workspace: Workspace,
373
    ) -> Query:
374
        return self._base_query(workspace)
375
376
    # TODO - G.M - 2018-07-17 - [Cleanup] Drop this method if unneeded
377
    # def get_child_folders(self, parent: Content=None, workspace: Workspace=None, filter_by_allowed_content_types: list=[], removed_item_ids: list=[], allowed_node_types=None) -> typing.List[Content]:
378
    #     """
379
    #     This method returns child items (folders or items) for left bar treeview.
380
    # 
381
    #     :param parent:
382
    #     :param workspace:
383
    #     :param filter_by_allowed_content_types:
384
    #     :param removed_item_ids:
385
    #     :param allowed_node_types: This parameter allow to hide folders for which the given type of content is not allowed.
386
    #            For example, if you want to move a Page from a folder to another, you should show only folders that accept pages
387
    #     :return:
388
    #     """
389
    #     filter_by_allowed_content_types = filter_by_allowed_content_types or []  # FDV
390
    #     removed_item_ids = removed_item_ids or []  # FDV
391
    # 
392
    #     if not allowed_node_types:
393
    #         allowed_node_types = [content_type_list.Folder.slug]
394
    #     elif allowed_node_types==content_type_list.Any_SLUG:
395
    #         allowed_node_types = ContentType.all()
396
    # 
397
    #     parent_id = parent.content_id if parent else None
398
    #     folders = self._base_query(workspace).\
399
    #         filter(Content.parent_id==parent_id).\
400
    #         filter(Content.type.in_(allowed_node_types)).\
401
    #         filter(Content.content_id.notin_(removed_item_ids)).\
402
    #         all()
403
    # 
404
    #     if not filter_by_allowed_content_types or \
405
    #                     len(filter_by_allowed_content_types)<=0:
406
    #         # Standard case for the left treeview: we want to show all contents
407
    #         # in the left treeview... so we still filter because for example
408
    #         # comments must not appear in the treeview
409
    #         return [folder for folder in folders \
410
    #                 if folder.type in ContentType.allowed_types_for_folding()]
411
    # 
412
    #     # Now this is a case of Folders only (used for moving content)
413
    #     # When moving a content, you must get only folders that allow to be filled
414
    #     # with the type of content you want to move
415
    #     result = []
416
    #     for folder in folders:
417
    #         for allowed_content_type in filter_by_allowed_content_types:
418
    # 
419
    #             is_folder = folder.type == content_type_list.Folder.slug
420
    #             content_type__allowed = folder.properties['allowed_content'][allowed_content_type] == True
421
    # 
422
    #             if is_folder and content_type__allowed:
423
    #                 result.append(folder)
424
    #                 break
425
    # 
426
    #     return result
427
428
    def _is_filename_available(
429
            self,
430
            filename: str,
431
            workspace: Workspace,
432
            parent: Content = None,
433
            exclude_content_id: int = None,
434
    ) -> bool:
435
        """
436
        Check if content label is free
437
        :param filename: content label
438
        :param workspace: workspace of the content
439
        :param parent: parent of the content
440
        :param exclude_content_id: exclude a specific content_id (useful
441
        to verify  other content for content update)
442
        :return: True if content label is available
443
        """
444
        # INFO - G.M - 2018-09-04 - Method should not be used by special content
445
        # with empty filename like comment.
446
        assert filename
447
        assert workspace
448
        label, file_extension = os.path.splitext(filename)
449
        query = self.get_base_query(workspace)
450
451
        if parent:
452
            query = query.filter(Content.parent_id == parent.content_id)
453
        else:
454
            query = query.filter(Content.parent_id == None)
455
456
        if exclude_content_id:
457
            query = query.filter(Content.content_id != exclude_content_id)
458
        query = query.filter(Content.workspace_id == workspace.workspace_id)
459
460
461
        nb_content_with_the_filename = query.filter(
462
            Content.file_name == filename
463
        ).count()
464
        if nb_content_with_the_filename == 0:
465
            return True
466
        elif nb_content_with_the_filename == 1:
467
            return False
468
        else:
469
            critical_error_text = 'Something is wrong in the database ! '\
470
                                  'Content filename should be unique ' \
471
                                  'in a same folder in database' \
472
                                  'but you have {nb} content with ' \
473
                                  'filename {filename} ' \
474
                                  'in workspace {workspace_id}'
475
476
            critical_error_text = critical_error_text.format(
477
                nb=nb_content_with_the_filename,
478
                filename=filename,
479
                workspace_id=workspace.workspace_id,
480
            )
481
            if parent:
482
                critical_error_text = '{text} and parent as content {parent_id}'.format(  # nopep8
483
                    text=critical_error_text,
484
                    parent_id=parent.parent_id
485
                )
486
            logger.critical(self, critical_error_text)
487
            return False
488
489
    def _prepare_filename(
490
            self,
491
            label: str,
492
            file_extension: str,
493
    ) -> str:
494
        """
495
        generate correct file_name from label and file_extension
496
        :return: valid
497
        """
498
        # TODO - G.M - 2018-10-11 - Find a way to
499
        # Refactor this in order to use same method
500
        # in both contentLib and .file_name of content
501
        return '{label}{file_extension}'.format(
502
            label=label,
503
            file_extension=file_extension
504
        )
505
506
    def _is_filename_available_or_raise(
507
        self,
508
        filename: str,
509
        workspace: Workspace,
510
        parent: Content = None,
511
        exclude_content_id: int = None,
512
    ) -> bool:
513
        """
514
        Same as _is_filename_available but raise exception instead of
515
        returning boolean if content filename is already used
516
        """
517
        if self._is_filename_available(
518
            filename,
519
            workspace,
520
            parent,
521
            exclude_content_id
522
        ):
523
            return True
524
        # INFO - G.M - 2018-10-11 - prepare exception message
525
        exception_message = 'A Content already exist with the same filename ' \
526
            '{filename}  in workspace {workspace_id}'
527
        exception_message = exception_message.format(
528
            filename=filename,
529
            workspace_id=workspace.workspace_id,
530
        )
531
        if parent:
532
            exception_message = '{text} and parent as content {parent_id}'.format(
533
                text=exception_message,
534
                parent_id=parent.parent_id
535
            )
536
        raise ContentFilenameAlreadyUsedInFolder(exception_message)
537
538
    def create(self, content_type_slug: str, workspace: Workspace, parent: Content=None, label: str = '', filename: str = '', do_save=False, is_temporary: bool=False, do_notify=True) -> Content:
539
        # TODO - G.M - 2018-07-16 - raise Exception instead of assert
540
        assert content_type_slug != content_type_list.Any_SLUG
541
        assert not (label and filename)
542
543
        if content_type_slug == FOLDER_TYPE and not label:
544
            label = self.generate_folder_label(workspace, parent)
545
546
        # TODO BS 2018-08-13: Despite that workspace is required, create_comment
547
        # can call here with None. Must update create_comment tu require the
548
        # workspace.
549
        if not workspace:
550
            workspace = parent.workspace
551
552
        content_type = content_type_list.get_one_by_slug(content_type_slug)
553
        if parent and parent.properties and 'allowed_content' in parent.properties:
554
            if content_type.slug not in parent.properties['allowed_content'] or not parent.properties['allowed_content'][content_type.slug]:
555
                raise UnallowedSubContent(' SubContent of type {subcontent_type}  not allowed in content {content_id}'.format(  # nopep8
556
                    subcontent_type=content_type.slug,
557
                    content_id=parent.content_id,
558
                ))
559
        if not workspace and parent:
560
            workspace = parent.workspace
561
562
        if workspace:
563
            if content_type.slug not in workspace.get_allowed_content_types():
564
                raise UnallowedSubContent(
565
                    ' SubContent of type {subcontent_type}  not allowed in workspace {content_id}'.format(  # nopep8
566
                        subcontent_type=content_type.slug,
567
                        content_id=workspace.workspace_id,
568
                    )
569
                )
570
        content = Content()
571
        if label:
572
            file_extension = ''
573
            if content_type.file_extension:
574
                file_extension = content_type.file_extension
575
            filename = self._prepare_filename(label, file_extension)
576
            self._is_filename_available_or_raise(
577
                filename,
578
                workspace,
579
                parent,
580
            )
581
            # TODO - G.M - 2018-10-15 - Set file extension and label
582
            # explicitly instead of filename in order to have correct
583
            # label/file-extension separation.
584
            content.label = label
585
            content.file_extension = file_extension
586
        elif filename:
587
            self._is_filename_available_or_raise(
588
                filename,
589
                workspace,
590
                parent
591
            )
592
            # INFO - G.M - 2018-07-04 - File_name setting automatically
593
            # set label and file_extension
594
            content.file_name = filename
595
        else:
596
            if content_type_slug == content_type_list.Comment.slug:
597
                # INFO - G.M - 2018-07-16 - Default label for comments is
598
                # empty string.
599
                content.label = ''
600
            else:
601
                raise EmptyLabelNotAllowed(
602
                    'Content of type {} should have a valid label'.format(content_type_slug)  # nopep8
603
                )
604
605
        content.owner = self._user
606
        content.parent = parent
607
608
        content.workspace = workspace
609
        content.type = content_type.slug
610
        content.is_temporary = is_temporary
611
        content.revision_type = ActionDescription.CREATION
612
613
        if do_save:
614
            self._session.add(content)
615
            self.save(content, ActionDescription.CREATION, do_notify=do_notify)
616
        return content
617
618
    def create_comment(self, workspace: Workspace=None, parent: Content=None, content:str ='', do_save=False, do_notify=True) -> Content:
619
        # TODO: check parent allowed_type and workspace allowed_ type
620
        assert parent and parent.type != FOLDER_TYPE
621
        if not self.is_editable(parent):
622
            raise ContentInNotEditableState(
623
                "Can't create comment on content, you need to change his status or state (deleted/archived) before any change."
624
            )
625
        if not content:
626
            raise EmptyCommentContentNotAllowed()
627
628
        item = self.create(
629
            content_type_slug=content_type_list.Comment.slug,
630
            workspace=workspace,
631
            parent=parent,
632
            do_notify=False,
633
            do_save=False,
634
            label='',
635
        )
636
        item.description = content
637
        item.revision_type = ActionDescription.COMMENT
638
639
        if do_save:
640
            self.save(item, ActionDescription.COMMENT, do_notify=do_notify)
641
        return item
642
643
    def get_one_from_revision(self, content_id: int, content_type: str, workspace: Workspace=None, revision_id=None) -> Content:
644
        """
645
        This method is a hack to convert a node revision item into a node
646
        :param content_id:
647
        :param content_type:
648
        :param workspace:
649
        :param revision_id:
650
        :return:
651
        """
652
653
        content = self.get_one(content_id, content_type, workspace)
654
        revision = self._session.query(ContentRevisionRO).filter(ContentRevisionRO.revision_id==revision_id).one()
655
656
        if revision.content_id==content.content_id:
657
            content.revision_to_serialize = revision.revision_id
658
        else:
659
            raise ValueError('Revision not found for given content')
660
661
        return content
662
663
    def get_one(self, content_id: int, content_type: str, workspace: Workspace=None, parent: Content=None) -> Content:
664
665
        if not content_id:
666
            return None
667
668
        base_request = self._base_query(workspace).filter(Content.content_id==content_id)
669
670
        if content_type!=content_type_list.Any_SLUG:
671
            base_request = base_request.filter(Content.type==content_type)
672
673
        if parent:
674
            base_request = base_request.filter(Content.parent_id==parent.content_id)  # nopep8
675
676
        try:
677
            content = base_request.one()
678
        except NoResultFound as exc:
679
            # TODO - G.M - 2018-07-16 - Add better support for all different
680
            # error case who can happened here
681
            # like content doesn't exist, wrong parent, wrong content_type, wrong workspace,
682
            # wrong access to this workspace, wrong base filter according
683
            # to content_status.
684
            raise ContentNotFound('Content "{}" not found in database'.format(content_id)) from exc  # nopep8
685
        return content
686
687
    def get_one_revision(self, revision_id: int = None, content: Content= None) -> ContentRevisionRO:  # nopep8
688
        """
689
        This method allow us to get directly any revision with its id
690
        :param revision_id: The content's revision's id that we want to return
691
        :param content: The content related to the revision, if None do not
692
        check if revision is related to this content.
693
        :return: An item Content linked with the correct revision
694
        """
695
        assert revision_id is not None# DYN_REMOVE
696
697
        revision = self._session.query(ContentRevisionRO).filter(ContentRevisionRO.revision_id == revision_id).one()
698
        if content and revision.content_id != content.content_id:
699
            raise RevisionDoesNotMatchThisContent(
700
                'revision {revision_id} is not a revision of content {content_id}'.format(  # nopep8
701
                    revision_id=revision.revision_id,
702
                    content_id=content.content_id,
703
                    )
704
            )
705
        return revision
706
707
    # INFO - A.P - 2017-07-03 - python file object getter
708
    # in case of we cook a version of preview manager that allows a pythonic
709
    # access to files
710
    # def get_one_revision_file(self, revision_id: int = None):
711
    #     """
712
    #     This function allows us to directly get a Python file object from its
713
    #     revision identifier.
714
    #     :param revision_id: The revision id of the file we want to return
715
    #     :return: The corresponding Python file object
716
    #     """
717
    #     revision = self.get_one_revision(revision_id)
718
    #     return DepotManager.get().get(revision.depot_file)
719
720
    def get_one_revision_filepath(self, revision_id: int = None) -> str:
721
        """
722
        This method allows us to directly get a file path from its revision
723
        identifier.
724
        :param revision_id: The revision id of the filepath we want to return
725
        :return: The corresponding filepath
726
        """
727
        revision = self.get_one_revision(revision_id)
728
        depot = DepotManager.get()
729
        depot_stored_file = depot.get(revision.depot_file)  # type: StoredFile
730
        depot_file_path = depot_stored_file._file_path  # type: str
731
        return depot_file_path
732
733
    # TODO - G.M - 2018-09-04 - [Cleanup] Is this method already needed ?
734
    def get_one_by_label_and_parent(
735
            self,
736
            content_label: str,
737
            content_parent: Content=None,
738
    ) -> Content:
739
        """
740
        This method let us request the database to obtain a Content with its name and parent
741
        :param content_label: Either the content's label or the content's filename if the label is None
742
        :param content_parent: The parent's content
743
        :param workspace: The workspace's content
744
        :return The corresponding Content
745
        """
746
        workspace = content_parent.workspace if content_parent else None
747
        query = self._base_query(workspace)
748
        parent_id = content_parent.content_id if content_parent else None
749
        query = query.filter(Content.parent_id == parent_id)
750
751
        file_name, file_extension = os.path.splitext(content_label)
752
753
        # TODO - G.M - 2018-09-04 - If this method is needed, it should be
754
        # rewritten in order to avoid content_type hardcoded code there
755
        return query.filter(
756
            or_(
757
                and_(
758
                    Content.type == content_type_list.File.slug,
759
                    Content.label == file_name,
760
                    Content.file_extension == file_extension,
761
                ),
762
                and_(
763
                    Content.type == content_type_list.Thread.slug,
764
                    Content.label == file_name,
765
                ),
766
                and_(
767
                    Content.type == content_type_list.Page.slug,
768
                    Content.label == file_name,
769
                ),
770
                and_(
771
                    Content.type == content_type_list.Folder.slug,
772
                    Content.label == content_label,
773
                ),
774
            )
775
        ).one()
776
777
    # TODO - G.M - 2018-09-04 - [Cleanup] Is this method already needed ?
778
    def get_one_by_label_and_parent_labels(
779
            self,
780
            content_label: str,
781
            workspace: Workspace,
782
            content_parent_labels: [str]=None,
783
    ):
784
        """
785
        Return content with it's label, workspace and parents labels (optional)
786
        :param content_label: label of content (label or file_name)
787
        :param workspace: workspace containing all of this
788
        :param content_parent_labels: Ordered list of labels representing path
789
            of folder (without workspace label).
790
        E.g.: ['foo', 'bar'] for complete path /Workspace1/foo/bar folder
791
        :return: Found Content
792
        """
793
        query = self._base_query(workspace)
794
        parent_folder = None
795
796
        # Grab content parent folder if parent path given
797
        if content_parent_labels:
798
            parent_folder = self.get_folder_with_workspace_path_labels(
799
                content_parent_labels,
800
                workspace,
801
            )
802
803
        # Build query for found content by label
804
        content_query = self.filter_query_for_content_label_as_path(
805
            query=query,
806
            content_label_as_file=content_label,
807
        )
808
809
        # Modify query to apply parent folder filter if any
810
        if parent_folder:
811
            content_query = content_query.filter(
812
                Content.parent_id == parent_folder.content_id,
813
            )
814
        else:
815
            content_query = content_query.filter(
816
                Content.parent_id == None,
817
            )
818
819
        # Filter with workspace
820
        content_query = content_query.filter(
821
            Content.workspace_id == workspace.workspace_id,
822
        )
823
824
        # Return the content
825
        return content_query\
826
            .order_by(
827
                Content.revision_id.desc(),
828
            )\
829
            .one()
830
831
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
832
    def get_folder_with_workspace_path_labels(
833
            self,
834
            path_labels: [str],
835
            workspace: Workspace,
836
    ) -> Content:
837
        """
838
        Return a Content folder for given relative path.
839
        TODO BS 20161124: Not safe if web interface allow folder duplicate names
840
        :param path_labels: List of labels representing path of folder
841
        (without workspace label).
842
        E.g.: ['foo', 'bar'] for complete path /Workspace1/foo/bar folder
843
        :param workspace: workspace of folders
844
        :return: Content folder
845
        """
846
        query = self._base_query(workspace)
847
        folder = None
848
849
        for label in path_labels:
850
            # Filter query on label
851
            folder_query = query \
852
                .filter(
853
                Content.type == content_type_list.Folder.slug,
854
                Content.label == label,
855
                Content.workspace_id == workspace.workspace_id,
856
                )
857
858
            # Search into parent folder (if already deep)
859
            if folder:
860
                folder_query = folder_query\
861
                    .filter(
862
                        Content.parent_id == folder.content_id,
863
                    )
864
            else:
865
                folder_query = folder_query \
866
                    .filter(Content.parent_id == None)
867
868
            # Get thirst corresponding folder
869
            folder = folder_query \
870
                .order_by(Content.revision_id.desc()) \
871
                .one()
872
873
        return folder
874
875
    # TODO - G.M - 2018-09-04 - [Cleanup] Is this method already needed ?
876
    def filter_query_for_content_label_as_path(
877
            self,
878
            query: Query,
879
            content_label_as_file: str,
880
            is_case_sensitive: bool = False,
881
    ) -> Query:
882
        """
883
        Apply normalised filters to found Content corresponding as given label.
884
        :param query: query to modify
885
        :param content_label_as_file: label in this
886
        FILE version, use Content.file_name.
887
        :param is_case_sensitive: Take care about case or not
888
        :return: modified query
889
        """
890
        file_name, file_extension = os.path.splitext(content_label_as_file)
891
892
        label_filter = Content.label == content_label_as_file
893
        file_name_filter = Content.label == file_name
894
        file_extension_filter = Content.file_extension == file_extension
895
896
        if not is_case_sensitive:
897
            label_filter = func.lower(Content.label) == \
898
                           func.lower(content_label_as_file)
899
            file_name_filter = func.lower(Content.label) == \
900
                               func.lower(file_name)
901
            file_extension_filter = func.lower(Content.file_extension) == \
902
                                    func.lower(file_extension)
903
904
        # TODO - G.M - 2018-09-04 - If this method is needed, it should be
905
        # rewritten in order to avoid content_type hardcoded code there
906
        return query.filter(or_(
907
            and_(
908
                Content.type == content_type_list.File.slug,
909
                file_name_filter,
910
                file_extension_filter,
911
            ),
912
            and_(
913
                Content.type == content_type_list.Thread.slug,
914
                file_name_filter,
915
                file_extension_filter,
916
            ),
917
            and_(
918
                Content.type == content_type_list.Page.slug,
919
                file_name_filter,
920
                file_extension_filter,
921
            ),
922
            and_(
923
                Content.type == content_type_list.Folder.slug,
924
                label_filter,
925
            ),
926
        ))
927
928
    def get_pdf_preview_path(
929
        self,
930
        content_id: int,
931
        revision_id: int,
932
        page_number: int,
933
        file_extension: str,
934
    ) -> str:
935
        """
936
        Get pdf preview of revision of content
937
        :param content_id: id of content
938
        :param revision_id: id of content revision
939
        :param page_number: page number of the preview, useful for multipage
940
        content
941
        :param file_extension: file extension of the file
942
        :return: preview_path as string
943
        """
944
        file_path = self.get_one_revision_filepath(revision_id)
945
        try:
946
            page_number = preview_manager_page_format(page_number)
947
            if page_number >= self.preview_manager.get_page_nb(file_path, file_ext=file_extension):  # nopep8
948
                raise PageOfPreviewNotFound(
949
                    'page_number {page_number} of content {content_id} does not exist'.format(  # nopep8
950
                        page_number=page_number,
951
                        content_id=content_id
952
                    ),
953
                )
954
            pdf_preview_path = self.preview_manager.get_pdf_preview(
955
                file_path,
956
                page=page_number,
957
                file_ext=file_extension,
958
            )
959
        except PageOfPreviewNotFound as exc:
960
            # passthrough as this exception is already supported with
961
            # specific error code.
962
            raise exc
963
        except UnavailablePreviewType as exc:
964
            raise TracimUnavailablePreviewType() from exc
965
        except UnsupportedMimeType as exc:
966
            raise UnavailablePreview(
967
                'No preview available for content {}, revision {}'.format(content_id, revision_id)  # nopep8
968
            ) from exc
969
        except Exception as exc:
970
            logger.warning(
971
                self,
972
                "Unknown Preview_Generator Exception Occured : {}".format(str(exc))
973
            )
974
            logger.warning(self, traceback.format_exc())
975
            raise UnavailablePreview(
976
                'No preview available for content {}, revision {}'.format(content_id, revision_id)  # nopep8
977
            ) from exc
978
        return pdf_preview_path
979
980
    def get_full_pdf_preview_path(self, revision_id: int, file_extension: str) -> str:
981
        """
982
        Get full(multiple page) pdf preview of revision of content
983
        :param revision_id: id of revision
984
                :param file_extension: file extension of the file
985
        :return: path of the full pdf preview of this revision
986
        """
987
        file_path = self.get_one_revision_filepath(revision_id)
988
        try:
989
            pdf_preview_path = self.preview_manager.get_pdf_preview(file_path, file_ext=file_extension)  # nopep8
990
        except UnavailablePreviewType as exc:
991
            raise TracimUnavailablePreviewType() from exc
992
        except UnsupportedMimeType as exc:
993
            raise UnavailablePreview(
994
                'No preview available for revision {}'.format(revision_id)
995
            ) from exc
996
        except Exception as exc:
997
            logger.warning(
998
                self,
999
                "Unknown Preview_Generator Exception Occured : {}".format(str(exc))
1000
            )
1001
            logger.warning(self, traceback.format_exc())
1002
            raise UnavailablePreview(
1003
                'No preview available for revision {}'.format(revision_id)
1004
            ) from exc
1005
        return pdf_preview_path
1006
1007
    def get_jpg_preview_allowed_dim(self) -> PreviewAllowedDim:
1008
        """
1009
        Get jpg preview allowed dimensions and strict bool param.
1010
        """
1011
        return PreviewAllowedDim(
1012
            self._config.PREVIEW_JPG_RESTRICTED_DIMS,
1013
            self._config.PREVIEW_JPG_ALLOWED_DIMS,
1014
        )
1015
1016
    def get_jpg_preview_path(
1017
        self,
1018
        content_id: int,
1019
        revision_id: int,
1020
        page_number: int,
1021
        file_extension: str,
1022
        width: int = None,
1023
        height: int = None,
1024
    ) -> str:
1025
        """
1026
        Get jpg preview of revision of content
1027
        :param content_id: id of content
1028
        :param revision_id: id of content revision
1029
        :param page_number: page number of the preview, useful for multipage
1030
        content
1031
        :param file_extension: file extension of the file
1032
        :param width: width in pixel
1033
        :param height: height in pixel
1034
        :return: preview_path as string
1035
        """
1036
        file_path = self.get_one_revision_filepath(revision_id)
1037
        try:
1038
            page_number = preview_manager_page_format(page_number)
1039
            if page_number >= self.preview_manager.get_page_nb(file_path, file_ext=file_extension):  # nopep8
1040
                raise PageOfPreviewNotFound(
1041
                    'page {page_number} of revision {revision_id} of content {content_id} does not exist'.format(  # nopep8
1042
                        page_number=page_number,
1043
                        revision_id=revision_id,
1044
                        content_id=content_id,
1045
                    ),
1046
                )
1047
            if not width and not height:
1048
                width = self._config.PREVIEW_JPG_ALLOWED_DIMS[0].width
1049
                height = self._config.PREVIEW_JPG_ALLOWED_DIMS[0].height
1050
1051
            allowed_dim = False
1052
            for preview_dim in self._config.PREVIEW_JPG_ALLOWED_DIMS:
1053
                if width == preview_dim.width and height == preview_dim.height:
1054
                    allowed_dim = True
1055
                    break
1056
1057
            if not allowed_dim and self._config.PREVIEW_JPG_RESTRICTED_DIMS:
1058
                raise PreviewDimNotAllowed(
1059
                    'Size {width}x{height} is not allowed for jpeg preview'.format(
1060
                        width=width,
1061
                        height=height,
1062
                    )
1063
                )
1064
            jpg_preview_path = self.preview_manager.get_jpeg_preview(
1065
                file_path,
1066
                page=page_number,
1067
                width=width,
1068
                height=height,
1069
                file_ext=file_extension,
1070
            )
1071
        except (PreviewDimNotAllowed, PageOfPreviewNotFound) as exc:
1072
            # passthrough as those exceptions are already supported with
1073
            # specific error code.
1074
            raise exc
1075
        except UnsupportedMimeType as exc:
1076
            raise UnavailablePreview(
1077
                'No preview available for content {}, revision {}'.format(content_id, revision_id)  # nopep8
1078
            ) from exc
1079
        except Exception as exc:
1080
            logger.warning(
1081
                self,
1082
                "Unknown Preview_Generator Exception Occured : {}".format(str(exc))
1083
            )
1084
            logger.warning(self, traceback.format_exc())
1085
            raise UnavailablePreview(
1086
                'No preview available for content {}, revision {}'.format(content_id, revision_id)  # nopep8
1087
            ) from exc
1088
        return jpg_preview_path
1089
1090
    def _get_all_query(
1091
        self,
1092
        parent_id: int = None,
1093
        content_type_slug: str = content_type_list.Any_SLUG,
1094
        workspace: Workspace = None,
1095
        label:str = None,
1096
        order_by_properties: typing.Optional[typing.List[typing.Union[str, QueryableAttribute]]] = None,  # nopep8
1097
    ) -> Query:
1098
        """
1099
        Extended filter for better "get all data" query
1100
        :param parent_id: filter by parent_id
1101
        :param content_type_slug: filter by content_type slug
1102
        :param workspace: filter by workspace
1103
        :param order_by_properties: filter by properties can be both string of
1104
        attribute or attribute of Model object from sqlalchemy(preferred way,
1105
        QueryableAttribute object)
1106
        :return: Query object
1107
        """
1108
        order_by_properties = order_by_properties or []  # FDV
1109
        assert parent_id is None or isinstance(parent_id, int)
1110
        assert content_type_slug is not None
1111
        resultset = self._base_query(workspace)
1112
1113
        if content_type_slug != content_type_list.Any_SLUG:
1114
            # INFO - G.M - 2018-07-05 - convert with
1115
            #  content type object to support legacy slug
1116
            content_type_object = content_type_list.get_one_by_slug(content_type_slug)
1117
            all_slug_alias = [content_type_object.slug]
1118
            if content_type_object.slug_alias:
1119
                all_slug_alias.extend(content_type_object.slug_alias)
1120
            resultset = resultset.filter(Content.type.in_(all_slug_alias))
1121
1122
        if parent_id:
1123
            resultset = resultset.filter(Content.parent_id==parent_id)
1124
        if parent_id == 0 or parent_id is False:
1125
            resultset = resultset.filter(Content.parent_id == None)
1126
        if label:
1127
            resultset = resultset.filter(Content.label.ilike('%{}%'.format(label)))  # nopep8
1128
1129
        for _property in order_by_properties:
1130
            resultset = resultset.order_by(_property)
1131
1132
        return resultset
1133
1134
    def get_all(
1135
            self,
1136
            parent_id: int=None,
1137
            content_type: str=content_type_list.Any_SLUG,
1138
            workspace: Workspace=None,
1139
            label: str=None,
1140
            order_by_properties: typing.Optional[typing.List[typing.Union[str, QueryableAttribute]]] = None,  # nopep8
1141
    ) -> typing.List[Content]:
1142
        """
1143
        Return all content using some filters
1144
        :param parent_id: filter by parent_id
1145
        :param content_type: filter by content_type slug
1146
        :param workspace: filter by workspace
1147
        :param order_by_properties: filter by properties can be both string of
1148
        attribute or attribute of Model object from sqlalchemy(preferred way,
1149
        QueryableAttribute object)
1150
        :return: List of contents
1151
        """
1152
        order_by_properties = order_by_properties or []  # FDV
1153
        return self._get_all_query(parent_id, content_type, workspace, label, order_by_properties).all()
1154
1155
    # TODO - G.M - 2018-07-17 - [Cleanup] Drop this method if unneeded
1156
    # def get_children(self, parent_id: int, content_types: list, workspace: Workspace=None) -> typing.List[Content]:
1157
    #     """
1158
    #     Return parent_id childs of given content_types
1159
    #     :param parent_id: parent id
1160
    #     :param content_types: list of types
1161
    #     :param workspace: workspace filter
1162
    #     :return: list of content
1163
    #     """
1164
    #     resultset = self._base_query(workspace)
1165
    #     resultset = resultset.filter(Content.type.in_(content_types))
1166
    #
1167
    #     if parent_id:
1168
    #         resultset = resultset.filter(Content.parent_id==parent_id)
1169
    #     if parent_id is False:
1170
    #         resultset = resultset.filter(Content.parent_id == None)
1171
    #
1172
    #     return resultset.all()
1173
1174
    # TODO - G.M - 2018-07-17 - [Cleanup] Drop this method if unneeded
1175
    # TODO find an other name to filter on is_deleted / is_archived
1176
    def get_all_with_filter(self, parent_id: int=None, content_type: str=content_type_list.Any_SLUG, workspace: Workspace=None) -> typing.List[Content]:
1177
        assert parent_id is None or isinstance(parent_id, int) # DYN_REMOVE
1178
        assert content_type is not None# DYN_REMOVE
1179
        assert isinstance(content_type, str) # DYN_REMOVE
1180
1181
        resultset = self._base_query(workspace)
1182
1183
        if content_type != content_type_list.Any_SLUG:
1184
            resultset = resultset.filter(Content.type==content_type)
1185
1186
        resultset = resultset.filter(Content.is_deleted == self._show_deleted)
1187
        resultset = resultset.filter(Content.is_archived == self._show_archived)
1188
        resultset = resultset.filter(Content.is_temporary == self._show_temporary)
1189
1190
        resultset = resultset.filter(Content.parent_id==parent_id)
1191
1192
        return resultset.all()
1193
1194
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1195
    def get_all_without_exception(self, content_type: str, workspace: Workspace=None) -> typing.List[Content]:
1196
        assert content_type is not None# DYN_REMOVE
1197
1198
        resultset = self._base_query(workspace)
1199
1200
        if content_type != content_type_list.Any_SLUG:
1201
            resultset = resultset.filter(Content.type==content_type)
1202
1203
        return resultset.all()
1204
1205
    def get_last_unread(self, parent_id: int, content_type: str,
1206
                        workspace: Workspace=None, limit=10) -> typing.List[Content]:
1207
        assert parent_id is None or isinstance(parent_id, int) # DYN_REMOVE
1208
        assert content_type is not None# DYN_REMOVE
1209
        assert isinstance(content_type, str) # DYN_REMOVE
1210
1211
        read_revision_ids = self._session.query(RevisionReadStatus.revision_id) \
1212
            .filter(RevisionReadStatus.user_id==self._user_id)
1213
1214
        not_read_revisions = self._revisions_base_query(workspace) \
1215
            .filter(~ContentRevisionRO.revision_id.in_(read_revision_ids)) \
1216
            .filter(ContentRevisionRO.workspace_id == Workspace.workspace_id) \
1217
            .filter(Workspace.is_deleted.is_(False)) \
1218
            .subquery()
1219
1220
    # TODO - G.M - 2018-07-17 - [Cleanup] Drop this method if unneeded
1221
    # def get_all_without_exception(self, content_type: str, workspace: Workspace=None) -> typing.List[Content]:
1222
    #     assert content_type is not None# DYN_REMOVE
1223
    #
1224
    #     resultset = self._base_query(workspace)
1225
    #
1226
    #     if content_type != content_type_list.Any_SLUG:
1227
    #         resultset = resultset.filter(Content.type==content_type)
1228
    #
1229
    #     return resultset.all()
1230
1231
    def get_last_active(
1232
            self,
1233
            workspace: Workspace=None,
1234
            limit: typing.Optional[int]=None,
1235
            before_content: typing.Optional[Content]= None,
1236
            content_ids: typing.Optional[typing.List[int]] = None,
1237
    ) -> typing.List[Content]:
1238
        """
1239
        get contents list sorted by last update
1240
        (last modification of content itself or one of this comment)
1241
        :param workspace: Workspace to check
1242
        :param limit: maximum number of elements to return
1243
        :param before_content: last_active content are only those updated
1244
         before this content given.
1245
        :param content_ids: restrict selection to some content ids and
1246
        related Comments
1247
        :return: list of content
1248
        """
1249
1250
        resultset = self._get_all_query(
1251
            workspace=workspace,
1252
        )
1253
        if content_ids:
1254
            resultset = resultset.filter(
1255
                or_(
1256
                    Content.content_id.in_(content_ids),
1257
                    and_(
1258
                        Content.parent_id.in_(content_ids),
1259
                        Content.type == content_type_list.Comment.slug
1260
                    )
1261
                )
1262
            )
1263
1264
        resultset = resultset.order_by(desc(ContentRevisionRO.updated), desc(ContentRevisionRO.revision_id), desc(ContentRevisionRO.content_id))
1265
1266
        active_contents = []
1267
        too_recent_content = []
1268
        before_content_find = False
1269
        for content in resultset:
1270
            related_active_content = None
1271
            if content_type_list.Comment.slug == content.type:
1272
                related_active_content = content.parent
1273
            else:
1274
                related_active_content = content
1275
1276
            # INFO - G.M - 2018-08-10 - re-apply general filters here to avoid
1277
            # issue with comments
1278
            if not self._show_deleted and related_active_content.is_deleted:
1279
                continue
1280
            if not self._show_archived and related_active_content.is_archived:
1281
                continue
1282
1283
            if related_active_content not in active_contents and related_active_content not in too_recent_content:  # nopep8
1284
1285
                if not before_content or before_content_find:
1286
                    active_contents.append(related_active_content)
1287
                else:
1288
                    too_recent_content.append(related_active_content)
1289
1290
                if before_content and related_active_content == before_content:
1291
                    before_content_find = True
1292
1293
            if limit and len(active_contents) >= limit:
1294
                break
1295
1296
        return active_contents
1297
1298
    # TODO - G.M - 2018-07-19 - Find a way to update this method to something
1299
    # usable and efficient for tracim v2 to get content with read/unread status
1300
    # instead of relying on has_new_information_for()
1301
    # def get_last_unread(self, parent_id: typing.Optional[int], content_type: str,
1302
    #                     workspace: Workspace=None, limit=10) -> typing.List[Content]:
1303
    #     assert parent_id is None or isinstance(parent_id, int) # DYN_REMOVE
1304
    #     assert content_type is not None# DYN_REMOVE
1305
    #     assert isinstance(content_type, str) # DYN_REMOVE
1306
    #
1307
    #     read_revision_ids = self._session.query(RevisionReadStatus.revision_id) \
1308
    #         .filter(RevisionReadStatus.user_id==self._user_id)
1309
    #
1310
    #     not_read_revisions = self._revisions_base_query(workspace) \
1311
    #         .filter(~ContentRevisionRO.revision_id.in_(read_revision_ids)) \
1312
    #         .filter(ContentRevisionRO.workspace_id == Workspace.workspace_id) \
1313
    #         .filter(Workspace.is_deleted.is_(False)) \
1314
    #         .subquery()
1315
    #
1316
    #     not_read_content_ids_query = self._session.query(
1317
    #         distinct(not_read_revisions.c.content_id)
1318
    #     )
1319
    #     not_read_content_ids = list(map(
1320
    #         itemgetter(0),
1321
    #         not_read_content_ids_query,
1322
    #     ))
1323
    #
1324
    #     not_read_contents = self._base_query(workspace) \
1325
    #         .filter(Content.content_id.in_(not_read_content_ids)) \
1326
    #         .order_by(desc(Content.updated))
1327
    #
1328
    #     if content_type != content_type_list.Any_SLUG:
1329
    #         not_read_contents = not_read_contents.filter(
1330
    #             Content.type==content_type)
1331
    #     else:
1332
    #         not_read_contents = not_read_contents.filter(
1333
    #             Content.type!=content_type_list.Folder.slug)
1334
    #
1335
    #     if parent_id:
1336
    #         not_read_contents = not_read_contents.filter(
1337
    #             Content.parent_id==parent_id)
1338
    #
1339
    #     result = []
1340
    #     for item in not_read_contents:
1341
    #         new_item = None
1342
    #         if content_type_list.Comment.slug == item.type:
1343
    #             new_item = item.parent
1344
    #         else:
1345
    #             new_item = item
1346
    #
1347
    #         # INFO - D.A. - 2015-05-20
1348
    #         # We do not want to show only one item if the last 10 items are
1349
    #         # comments about one thread for example
1350
    #         if new_item not in result:
1351
    #             result.append(new_item)
1352
    #
1353
    #         if len(result) >= limit:
1354
    #             break
1355
    #
1356
    #     return result
1357
1358
    def _set_allowed_content(self, content: Content, allowed_content_dict: dict) -> None:  # nopep8
1359
        """
1360
        :param content: the given content instance
1361
        :param allowed_content_dict: must be something like this:
1362
            dict(
1363
                folder = True
1364
                thread = True,
1365
                file = False,
1366
                page = True
1367
            )
1368
        :return: nothing
1369
        """
1370
        properties = content.properties.copy()
1371
        properties['allowed_content'] = allowed_content_dict
1372
        content.properties = properties
1373
1374
    def set_allowed_content(self, content: Content, allowed_content_type_slug_list: typing.List[str]) -> None:  # nopep8
1375
        """
1376
        :param content: the given content instance
1377
        :param allowed_content_type_slug_list: list of content_type_slug to
1378
        accept as subcontent.
1379
        :return: nothing
1380
        """
1381
        allowed_content_dict = {}
1382
        for allowed_content_type_slug in allowed_content_type_slug_list:
1383
            if allowed_content_type_slug not in content_type_list.endpoint_allowed_types_slug():
1384
                raise ContentTypeNotExist('Content_type {} does not exist'.format(allowed_content_type_slug))  # nopep8
1385
            allowed_content_dict[allowed_content_type_slug] = True
1386
1387
        self._set_allowed_content(content, allowed_content_dict)
1388
1389
    def restore_content_default_allowed_content(self, content: Content) -> None:
1390
        """
1391
        Return to default allowed_content_types
1392
        :param content: the given content instance
1393
        :return: nothing
1394
        """
1395
        if content._properties and 'allowed_content' in content._properties:
1396
            properties = content.properties.copy()
1397
            del properties['allowed_content']
1398
            content.properties = properties
1399
1400
    def set_status(self, content: Content, new_status: str):
1401
        if new_status in content_status_list.get_all_slugs_values():
1402
            content.status = new_status
1403
            content.revision_type = ActionDescription.STATUS_UPDATE
1404
        else:
1405
            raise ValueError('The given value {} is not allowed'.format(new_status))
1406
1407
    def move(self,
1408
             item: Content,
1409
             new_parent: Content,
1410
             must_stay_in_same_workspace: bool=True,
1411
             new_workspace: Workspace=None,
1412
    ):
1413
        if must_stay_in_same_workspace:
1414
            if new_parent and new_parent.workspace_id != item.workspace_id:
1415
                raise ValueError('the item should stay in the same workspace')
1416
1417
        item.parent = new_parent
1418
        if new_workspace:
1419
            item.workspace = new_workspace
1420
            if new_parent and \
1421
                    new_parent.workspace_id != new_workspace.workspace_id:
1422
                raise WorkspacesDoNotMatch(
1423
                    'new parent workspace and new workspace should be the same.'
1424
                )
1425
        else:
1426
            if new_parent:
1427
                item.workspace = new_parent.workspace
1428
1429
        self._is_filename_available_or_raise(
1430
            item.file_name,
1431
            item.workspace,
1432
            item.parent,
1433
            exclude_content_id=item.content_id
1434
        )
1435
        item.revision_type = ActionDescription.MOVE
1436
1437
    def copy(
1438
        self,
1439
        item: Content,
1440
        new_parent: Content=None,
1441
        new_label: str=None,
1442
        do_save: bool=True,
1443
        do_notify: bool=True,
1444
    ) -> Content:
1445
        """
1446
        Copy nearly all content, revision included. Children not included, see
1447
        "copy_children" for this.
1448
        :param item: Item to copy
1449
        :param new_parent: new parent of the new copied item
1450
        :param new_label: new label of the new copied item
1451
        :param do_notify: notify copy or not
1452
        :return: Newly copied item
1453
        """
1454
        if (not new_parent and not new_label) or (new_parent == item.parent and new_label == item.label):  # nopep8
1455
            # TODO - G.M - 08-03-2018 - Use something else than value error
1456
            raise ValueError("You can't copy file into itself")
1457
        if new_parent:
1458
            workspace = new_parent.workspace
1459
            parent = new_parent
1460
        else:
1461
            workspace = item.workspace
1462
            parent = item.parent
1463
        label = new_label or item.label
1464
        filename = self._prepare_filename(label, item.file_extension)
1465
        self._is_filename_available_or_raise(filename, workspace, parent)
1466
        content = item.copy(parent)
1467
        # INFO - GM - 15-03-2018 - add "copy" revision
1468
        with new_revision(
1469
            session=self._session,
1470
            tm=transaction.manager,
1471
            content=content,
1472
            force_create_new_revision=True
1473
        ) as rev:
1474
            rev.parent = parent
1475
            rev.workspace = workspace
1476
            rev.file_name = filename
1477
            rev.revision_type = ActionDescription.COPY
1478
            rev.properties['origin'] = {
1479
                'content': item.id,
1480
                'revision': item.last_revision.revision_id,
1481
            }
1482
        if do_save:
1483
            self.save(content, ActionDescription.COPY, do_notify=do_notify)
1484
        return content
1485
1486
    def copy_children(self, origin_content: Content, new_content: Content):
1487
        for child in origin_content.children:
1488
            self.copy(child, new_content)
1489
1490
    def move_recursively(self, item: Content,
1491
                         new_parent: Content, new_workspace: Workspace):
1492
        self.move(item, new_parent, False, new_workspace)
1493
        self.save(item, do_notify=False)
1494
1495
        for child in item.children:
1496
            with new_revision(
1497
                session=self._session,
1498
                tm=transaction.manager,
1499
                content=child
1500
            ):
1501
                self.move_recursively(child, item, new_workspace)
1502
        return
1503
1504
    def is_editable(self, item: Content) -> bool:
1505
        return not item.is_readonly \
1506
               and item.is_active \
1507
               and item.get_status().is_editable()
1508
1509
    def update_content(self, item: Content, new_label: str, new_content: str=None) -> Content:
1510
        if not self.is_editable(item):
1511
            raise ContentInNotEditableState("Can't update not editable file, you need to change his status or state (deleted/archived) before any change.")  # nopep8
1512
        if item.label == new_label and item.description == new_content:
1513
            # TODO - G.M - 20-03-2018 - Fix internatization for webdav access.
1514
            # Internatization disabled in libcontent for now.
1515
            raise SameValueError('The content did not changed')
1516
        if not new_label:
1517
            raise EmptyLabelNotAllowed()
1518
1519
        label = new_label or item.label
1520
        filename = self._prepare_filename(label, item.file_extension)
1521
        self._is_filename_available_or_raise(
1522
            filename,
1523
            item.workspace,
1524
            item.parent,
1525
            exclude_content_id=item.content_id
1526
        )
1527
1528
        item.owner = self._user
1529
        item.label = new_label
1530
        item.description = new_content if new_content else item.description # TODO: convert urls into links
1531
        item.revision_type = ActionDescription.EDITION
1532
        return item
1533
1534
    def update_file_data(self, item: Content, new_filename: str, new_mimetype: str, new_content: bytes) -> Content:
1535
        if not self.is_editable(item):
1536
            raise ContentInNotEditableState("Can't update not editable file, you need to change his status or state (deleted/archived) before any change.")  # nopep8
1537
        # FIXME - G.M - 2018-09-25 - Repair and do a better same content check,
1538
        # as pyramid behaviour use buffered object
1539
        # new_content == item.depot_file.file.read() case cannot happened using
1540
        # whenever new_content.read() == item.depot_file.file.read().
1541
        # as this behaviour can create struggle with big file, simple solution
1542
        # using read can be used everytime.
1543
        # if new_mimetype == item.file_mimetype and \
1544
        #         new_content == item.depot_file.file.read():
1545
        #     raise SameValueError('The content did not changed')
1546
        item.owner = self._user
1547
        self._is_filename_available_or_raise(
1548
            new_filename,
1549
            item.workspace,
1550
            item.parent,
1551
            exclude_content_id=item.content_id
1552
        )
1553
        item.file_name = new_filename
1554
        item.file_mimetype = new_mimetype
1555
        item.depot_file = FileIntent(
1556
            new_content,
1557
            new_filename,
1558
            new_mimetype,
1559
        )
1560
        item.revision_type = ActionDescription.REVISION
1561
        return item
1562
1563
    def archive(self, content: Content):
1564
        content.owner = self._user
1565
        content.is_archived = True
1566
        # TODO - G.M - 12-03-2018 - Inspect possible label conflict problem
1567
        # INFO - G.M - 12-03-2018 - Set label name to avoid trouble when
1568
        # un-archiving file.
1569
        label = '{label}-{action}-{date}'.format(
1570
            label=content.label,
1571
            action='archived',
1572
            date=current_date_for_filename()
1573
        )
1574
        filename = self._prepare_filename(label, content.file_extension)
1575
        self._is_filename_available_or_raise(
1576
            filename,
1577
            content.workspace,
1578
            content.parent,
1579
            exclude_content_id=content.content_id
1580
        )
1581
        content.file_name = filename
1582
        content.revision_type = ActionDescription.ARCHIVING
1583
1584
    def unarchive(self, content: Content):
1585
        content.owner = self._user
1586
        content.is_archived = False
1587
        content.revision_type = ActionDescription.UNARCHIVING
1588
1589
    def delete(self, content: Content):
1590
        content.owner = self._user
1591
        content.is_deleted = True
1592
        # TODO - G.M - 12-03-2018 - Inspect possible label conflict problem
1593
        # INFO - G.M - 12-03-2018 - Set label name to avoid trouble when
1594
        # un-deleting file.
1595
        label = '{label}-{action}-{date}'.format(
1596
            label=content.label,
1597
            action='deleted',
1598
            date=current_date_for_filename()
1599
        )
1600
        filename = self._prepare_filename(label, content.file_extension)
1601
        self._is_filename_available_or_raise(
1602
            filename,
1603
            content.workspace,
1604
            content.parent,
1605
            exclude_content_id=content.content_id
1606
        )
1607
        content.file_name = filename
1608
        content.revision_type = ActionDescription.DELETION
1609
1610
    def undelete(self, content: Content):
1611
        content.owner = self._user
1612
        content.is_deleted = False
1613
        content.revision_type = ActionDescription.UNDELETION
1614
1615
    def get_preview_page_nb(self, revision_id: int, file_extension: str) -> typing.Optional[int]:  # nopep8
1616
        file_path = self.get_one_revision_filepath(revision_id)
1617
        try:
1618
            nb_pages = self.preview_manager.get_page_nb(
1619
                file_path,
1620
                file_ext=file_extension
1621
            )
1622
        except UnsupportedMimeType:
1623
            return None
1624
        except Exception as e:
1625
            logger.warning(
1626
                self,
1627
                "Unknown Preview_Generator Exception Occured : {}".format(str(e))
1628
            )
1629
            logger.warning(self, traceback.format_exc())
1630
            return None
1631
        return nb_pages
1632
1633 View Code Duplication
    def has_pdf_preview(self, revision_id: int, file_extension: str) -> bool:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1634
        file_path = self.get_one_revision_filepath(revision_id)
1635
        try:
1636
            return self.preview_manager.has_pdf_preview(
1637
                file_path,
1638
                file_ext=file_extension
1639
            )
1640
        except UnsupportedMimeType:
1641
            return False
1642
        except Exception as e:
1643
            logger.warning(
1644
                self,
1645
                "Unknown Preview_Generator Exception Occured : {}".format(str(e))
1646
            )
1647
            logger.warning(self, traceback.format_exc())
1648
            return False
1649
1650 View Code Duplication
    def has_jpeg_preview(self, revision_id: int, file_extension: str) -> bool:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1651
        file_path = self.get_one_revision_filepath(revision_id)
1652
        try:
1653
            return self.preview_manager.has_jpeg_preview(
1654
                file_path,
1655
                file_ext=file_extension
1656
            )
1657
        except UnsupportedMimeType:
1658
            return False
1659
        except Exception as e:
1660
            logger.warning(
1661
                self,
1662
                "Unknown Preview_Generator Exception Occured : {}".format(str(e))
1663
            )
1664
            logger.warning(self, traceback.format_exc())
1665
            return False
1666
1667
    def mark_read__all(
1668
            self,
1669
            read_datetime: datetime=None,
1670
            do_flush: bool=True,
1671
            recursive: bool=True
1672
    ) -> None:
1673
        """
1674
        Read content of all workspace visible for the user.
1675
        :param read_datetime: date of readigin
1676
        :param do_flush: flush database
1677
        :param recursive: mark read subcontent too
1678
        :return: nothing
1679
        """
1680
        return self.mark_read__workspace(None, read_datetime, do_flush, recursive) # nopep8
1681
1682
    def mark_read__workspace(self,
1683
                       workspace : Workspace,
1684
                       read_datetime: datetime=None,
1685
                       do_flush: bool=True,
1686
                       recursive: bool=True
1687
    ) -> None:
1688
        """
1689
        Read content of a workspace visible for the user.
1690
        :param read_datetime: date of readigin
1691
        :param do_flush: flush database
1692
        :param recursive: mark read subcontent too
1693
        :return: nothing
1694
        """
1695
        itemset = self.get_last_active(workspace)
1696
        for item in itemset:
1697
            if item.has_new_information_for(self._user):
1698
                self.mark_read(item, read_datetime, do_flush, recursive)
1699
1700
    def mark_read(
1701
            self,
1702
            content: Content,
1703
            read_datetime: datetime=None,
1704
            do_flush: bool=True,
1705
            recursive: bool=True
1706
    ) -> Content:
1707
        """
1708
        Read content for the user.
1709
        :param read_datetime: date of readigin
1710
        :param do_flush: flush database
1711
        :param recursive: mark read subcontent too
1712
        :return: nothing
1713
        """
1714
        assert self._user
1715
        assert content
1716
1717
        # The algorithm is:
1718
        # 1. define the read datetime
1719
        # 2. update all revisions related to current Content
1720
        # 3. do the same for all child revisions
1721
        #    (ie parent_id is content_id of current content)
1722
1723
        if not read_datetime:
1724
            read_datetime = datetime.datetime.now()
1725
1726
        viewed_revisions = self._session.query(ContentRevisionRO) \
1727
            .filter(ContentRevisionRO.content_id==content.content_id).all()
1728
1729
        for revision in viewed_revisions:
1730
            revision.read_by[self._user] = read_datetime
1731
1732
        if recursive:
1733
            # mark read :
1734
            # - all children
1735
            # - parent stuff (if you mark a comment as read,
1736
            #                 then you have seen the parent)
1737
            # - parent comments
1738
            for child in content.get_valid_children():
1739
                self.mark_read(child, read_datetime=read_datetime,
1740
                               do_flush=False)
1741
1742
            if content_type_list.Comment.slug == content.type:
1743
                self.mark_read(content.parent, read_datetime=read_datetime,
1744
                               do_flush=False, recursive=False)
1745
                for comment in content.parent.get_comments():
1746
                    if comment != content:
1747
                        self.mark_read(comment, read_datetime=read_datetime,
1748
                                       do_flush=False, recursive=False)
1749
1750
        if do_flush:
1751
            self.flush()
1752
1753
        return content
1754
1755
    def mark_unread(self, content: Content, do_flush=True) -> Content:
1756
        assert self._user
1757
        assert content
1758
1759
        revisions = self._session.query(ContentRevisionRO) \
1760
            .filter(ContentRevisionRO.content_id==content.content_id).all()
1761
1762
        for revision in revisions:
1763
            try:
1764
                del revision.read_by[self._user]
1765
            except KeyError:
1766
                pass
1767
1768
        for child in content.get_valid_children():
1769
            self.mark_unread(child, do_flush=False)
1770
1771
        if do_flush:
1772
            self.flush()
1773
1774
        return content
1775
1776
    def flush(self):
1777
        self._session.flush()
1778
1779
    def save(self, content: Content, action_description: str=None, do_flush=True, do_notify=True):
1780
        """
1781
        Save an object, flush the session and set the revision_type property
1782
        :param content:
1783
        :param action_description:
1784
        :return:
1785
        """
1786
        assert action_description is None or action_description in ActionDescription.allowed_values()
1787
1788
        if not action_description:
1789
            # See if the last action has been modified
1790
            if content.revision_type==None or len(get_history(content.revision, 'revision_type'))<=0:
1791
                # The action has not been modified, so we set it to default edition
1792
                action_description = ActionDescription.EDITION
1793
1794
        if action_description:
1795
            content.revision_type = action_description
1796
1797
        if do_flush:
1798
            # INFO - 2015-09-03 - D.A.
1799
            # There are 2 flush because of the use
1800
            # of triggers for content creation
1801
            #
1802
            # (when creating a content, actually this is an insert of a new
1803
            # revision in content_revisions ; so the mark_read operation need
1804
            # to get full real data from database before to be prepared.
1805
1806
            self._session.add(content)
1807
            self._session.flush()
1808
1809
            # TODO - 2015-09-03 - D.A. - Do not use triggers
1810
            # We should create a new ContentRevisionRO object instead of Content
1811
            # This would help managing view/not viewed status
1812
            self.mark_read(content, do_flush=True)
1813
1814
        if do_notify:
1815
            self.do_notify(content)
1816
1817
    def do_notify(self, content: Content):
1818
        """
1819
        Allow to force notification for a given content. By default, it is
1820
        called during the .save() operation
1821
        :param content:
1822
        :return:
1823
        """
1824
        NotifierFactory.create(
1825
            config=self._config,
1826
            current_user=self._user,
1827
            session=self._session,
1828
        ).notify_content_update(content)
1829
1830
    def get_keywords(self, search_string, search_string_separators=None) -> [str]:
1831
        """
1832
        :param search_string: a list of coma-separated keywords
1833
        :return: a list of str (each keyword = 1 entry
1834
        """
1835
1836
        search_string_separators = search_string_separators or ContentApi.SEARCH_SEPARATORS
1837
1838
        keywords = []
1839
        if search_string:
1840
            keywords = [keyword.strip() for keyword in re.split(search_string_separators, search_string)]
1841
1842
        return keywords
1843
1844
    def search(self, keywords: [str]) -> Query:
1845
        """
1846
        :return: a sorted list of Content items
1847
        """
1848
1849
        if len(keywords)<=0:
1850
            return None
1851
1852
        filter_group_label = list(Content.label.ilike('%{}%'.format(keyword)) for keyword in keywords)
1853
        filter_group_desc = list(Content.description.ilike('%{}%'.format(keyword)) for keyword in keywords)
1854
        title_keyworded_items = self._hard_filtered_base_query().\
1855
            filter(or_(*(filter_group_label+filter_group_desc))).\
1856
            options(joinedload('children_revisions')).\
1857
            options(joinedload('parent'))
1858
1859
        return title_keyworded_items
1860
1861
    def get_all_types(self) -> typing.List[ContentType]:
1862
        labels = content_type_list.endpoint_allowed_types_slug()
1863
        content_types = []
1864
        for label in labels:
1865
            content_types.append(content_type_list.get_one_by_slug(label))
1866
1867
        return content_types
1868
1869
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1870
    def exclude_unavailable(
1871
        self,
1872
        contents: typing.List[Content],
1873
    ) -> typing.List[Content]:
1874
        """
1875
        Update and return list with content under archived/deleted removed.
1876
        :param contents: List of contents to parse
1877
        """
1878
        for content in contents[:]:
1879
            if self.content_under_deleted(content) or self.content_under_archived(content):
1880
                contents.remove(content)
1881
        return contents
1882
1883
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1884
    def content_under_deleted(self, content: Content) -> bool:
1885
        if content.parent:
1886
            if content.parent.is_deleted:
1887
                return True
1888
            if content.parent.parent:
1889
                return self.content_under_deleted(content.parent)
1890
        return False
1891
1892
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1893
    def content_under_archived(self, content: Content) -> bool:
1894
        if content.parent:
1895
            if content.parent.is_archived:
1896
                return True
1897
            if content.parent.parent:
1898
                return self.content_under_archived(content.parent)
1899
        return False
1900
1901
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1902
    def find_one_by_unique_property(
1903
            self,
1904
            property_name: str,
1905
            property_value: str,
1906
            workspace: Workspace=None,
1907
    ) -> Content:
1908
        """
1909
        Return Content who contains given property.
1910
        Raise sqlalchemy.orm.exc.MultipleResultsFound if more than one Content
1911
        contains this property value.
1912
        :param property_name: Name of property
1913
        :param property_value: Value of property
1914
        :param workspace: Workspace who contains Content
1915
        :return: Found Content
1916
        """
1917
        # TODO - 20160602 - Bastien: Should be JSON type query
1918
        # see https://www.compose.io/articles/using-json-extensions-in-\
1919
        # postgresql-from-python-2/
1920
        query = self._base_query(workspace=workspace).filter(
1921
            Content._properties.like(
1922
                '%"{property_name}": "{property_value}"%'.format(
1923
                    property_name=property_name,
1924
                    property_value=property_value,
1925
                )
1926
            )
1927
        )
1928
        return query.one()
1929
1930
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1931
    def generate_folder_label(
1932
            self,
1933
            workspace: Workspace,
1934
            parent: Content=None,
1935
    ) -> str:
1936
        """
1937
        Generate a folder label
1938
        :param workspace: Future folder workspace
1939
        :param parent: Parent of foture folder (can be None)
1940
        :return: Generated folder name
1941
        """
1942
        _ = self.translator.get_translation
1943
        query = self._base_query(workspace=workspace)\
1944
            .filter(Content.label.ilike('{0}%'.format(
1945
                _('New folder'),
1946
            )))
1947
        if parent:
1948
            query = query.filter(Content.parent == parent)
1949
1950
        return _('New folder {0}').format(
1951
            query.count() + 1,
1952
        )
1953