Passed
Pull Request — develop (#87)
by inkhey
01:30
created

ContentApi.undelete()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 4
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 1
nop 2
1
# -*- coding: utf-8 -*-
2
import datetime
3
import os
4
import re
5
import typing
6
from contextlib import contextmanager
7
8
import sqlalchemy
9
import transaction
10
from depot.io.utils import FileIntent
11
from depot.manager import DepotManager
12
from preview_generator.exception import UnavailablePreviewType
13
from preview_generator.exception import UnsupportedMimeType
14
from preview_generator.manager import PreviewManager
15
from sqlalchemy import desc
16
from sqlalchemy import func
17
from sqlalchemy import or_
18
from sqlalchemy.orm import Query
19
from sqlalchemy.orm import aliased
20
from sqlalchemy.orm import joinedload
21
from sqlalchemy.orm.attributes import QueryableAttribute
22
from sqlalchemy.orm.attributes import get_history
23
from sqlalchemy.orm.exc import NoResultFound
24
from sqlalchemy.orm.session import Session
25
from sqlalchemy.sql.elements import and_
26
27
from tracim_backend.app_models.contents import content_status_list
28
from tracim_backend.app_models.contents import content_type_list
29
from tracim_backend.app_models.contents import FOLDER_TYPE
30
from tracim_backend.app_models.contents import ContentStatus
31
from tracim_backend.app_models.contents import ContentType
32
from tracim_backend.app_models.contents import GlobalStatus
33
from tracim_backend.exceptions import ContentInNotEditableState
34
from tracim_backend.exceptions import ContentLabelAlreadyUsedHere
35
from tracim_backend.exceptions import ContentNotFound
36
from tracim_backend.exceptions import ContentTypeNotExist
37
from tracim_backend.exceptions import EmptyCommentContentNotAllowed
38
from tracim_backend.exceptions import EmptyLabelNotAllowed
39
from tracim_backend.exceptions import PageOfPreviewNotFound
40
from tracim_backend.exceptions import PreviewDimNotAllowed
41
from tracim_backend.exceptions import RevisionDoesNotMatchThisContent
42
from tracim_backend.exceptions import SameValueError
43
from tracim_backend.exceptions import TracimUnavailablePreviewType
44
from tracim_backend.exceptions import UnallowedSubContent
45
from tracim_backend.exceptions import UnavailablePreview
46
from tracim_backend.exceptions import WorkspacesDoNotMatch
47
from tracim_backend.lib.core.notifications import NotifierFactory
48
from tracim_backend.lib.utils.logger import logger
49
from tracim_backend.lib.utils.translation import DEFAULT_FALLBACK_LANG
50
from tracim_backend.lib.utils.translation import Translator
51
from tracim_backend.lib.utils.utils import cmp_to_key
52
from tracim_backend.lib.utils.utils import current_date_for_filename
53
from tracim_backend.lib.utils.utils import preview_manager_page_format
54
from tracim_backend.models.auth import User
55
from tracim_backend.models.context_models import ContentInContext
56
from tracim_backend.models.context_models import PreviewAllowedDim
57
from tracim_backend.models.context_models import RevisionInContext
58
from tracim_backend.models.data import ActionDescription
59
from tracim_backend.models.data import Content
60
from tracim_backend.models.data import ContentRevisionRO
61
from tracim_backend.models.data import NodeTreeItem
62
from tracim_backend.models.data import RevisionReadStatus
63
from tracim_backend.models.data import UserRoleInWorkspace
64
from tracim_backend.models.data import Workspace
65
from tracim_backend.models.revision_protection import new_revision
66
67
__author__ = 'damien'
68
69
70
# TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
71
def compare_content_for_sorting_by_type_and_name(
72
        content1: Content,
73
        content2: Content
74
) -> int:
75
    """
76
    :param content1:
77
    :param content2:
78
    :return:    1 if content1 > content2
79
                -1 if content1 < content2
80
                0 if content1 = content2
81
    """
82
83
    if content1.type == content2.type:
84
        if content1.get_label().lower()>content2.get_label().lower():
85
            return 1
86
        elif content1.get_label().lower()<content2.get_label().lower():
87
            return -1
88
        return 0
89
    else:
90
        # TODO - D.A. - 2014-12-02 - Manage Content Types Dynamically
91
        content_type_order = [
92
            content_type_list.Folder.slug,
93
            content_type_list.Page.slug,
94
            content_type_list.Thread.slug,
95
            content_type_list.File.slug,
96
        ]
97
98
        content_1_type_index = content_type_order.index(content1.type)
99
        content_2_type_index = content_type_order.index(content2.type)
100
        result = content_1_type_index - content_2_type_index
101
102
        if result < 0:
103
            return -1
104
        elif result > 0:
105
            return 1
106
        else:
107
            return 0
108
109
# TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
110
def compare_tree_items_for_sorting_by_type_and_name(
111
        item1: NodeTreeItem,
112
        item2: NodeTreeItem
113
) -> int:
114
    return compare_content_for_sorting_by_type_and_name(item1.node, item2.node)
115
116
117
class ContentApi(object):
118
119
    SEARCH_SEPARATORS = ',| '
120
    SEARCH_DEFAULT_RESULT_NB = 50
121
122
    # DISPLAYABLE_CONTENTS = (
123
    #     content_type_list.Folder.slug,
124
    #     content_type_list.File.slug,
125
    #     content_type_list.Comment.slug,
126
    #     content_type_list.Thread.slug,
127
    #     content_type_list.Page.slug,
128
    #     content_type_list.Page.slugLegacy,
129
    #     ContentType.MarkdownPage,
130
    # )
131
132
    def __init__(
133
            self,
134
            session: Session,
135
            current_user: typing.Optional[User],
136
            config,
137
            show_archived: bool = False,
138
            show_deleted: bool = False,
139
            show_temporary: bool = False,
140
            show_active: bool = True,
141
            all_content_in_treeview: bool = True,
142
            force_show_all_types: bool = False,
143
            disable_user_workspaces_filter: bool = False,
144
    ) -> None:
145
        self._session = session
146
        self._user = current_user
147
        self._config = config
148
        self._user_id = current_user.user_id if current_user else None
149
        self._show_archived = show_archived
150
        self._show_deleted = show_deleted
151
        self._show_temporary = show_temporary
152
        self._show_active = show_active
153
        self._show_all_type_of_contents_in_treeview = all_content_in_treeview
154
        self._force_show_all_types = force_show_all_types
155
        self._disable_user_workspaces_filter = disable_user_workspaces_filter
156
        self.preview_manager = PreviewManager(self._config.PREVIEW_CACHE_DIR, create_folder=True)  # nopep8
157
        default_lang = None
158
        if self._user:
159
            default_lang = self._user.lang
160
        if not default_lang:
161
            default_lang = DEFAULT_FALLBACK_LANG
162
        self.translator = Translator(app_config=self._config, default_lang=default_lang)  # nopep8
163
164
    @contextmanager
165
    def show(
166
            self,
167
            show_archived: bool=False,
168
            show_deleted: bool=False,
169
            show_temporary: bool=False,
170
    ) -> typing.Generator['ContentApi', None, None]:
171
        """
172
        Use this method as context manager to update show_archived,
173
        show_deleted and show_temporary properties during context.
174
        :param show_archived: show archived contents
175
        :param show_deleted:  show deleted contents
176
        :param show_temporary:  show temporary contents
177
        """
178
        previous_show_archived = self._show_archived
179
        previous_show_deleted = self._show_deleted
180
        previous_show_temporary = self._show_temporary
181
182
        try:
183
            self._show_archived = show_archived
184
            self._show_deleted = show_deleted
185
            self._show_temporary = show_temporary
186
            yield self
187
        finally:
188
            self._show_archived = previous_show_archived
189
            self._show_deleted = previous_show_deleted
190
            self._show_temporary = previous_show_temporary
191
192
    def get_content_in_context(self, content: Content) -> ContentInContext:
193
        return ContentInContext(content, self._session, self._config, self._user)  # nopep8
194
195
    def get_revision_in_context(self, revision: ContentRevisionRO) -> RevisionInContext:  # nopep8
196
        # TODO - G.M - 2018-06-173 - create revision in context object
197
        return RevisionInContext(revision, self._session, self._config, self._user) # nopep8
198
    
199
    def _get_revision_join(self) -> sqlalchemy.sql.elements.BooleanClauseList:
200
        """
201
        Return the Content/ContentRevision query join condition
202
        :return: Content/ContentRevision query join condition
203
        """
204
        return and_(Content.id == ContentRevisionRO.content_id,
205
                    ContentRevisionRO.revision_id == self._session.query(
206
                        ContentRevisionRO.revision_id)
207
                    .filter(ContentRevisionRO.content_id == Content.id)
208
                    .order_by(ContentRevisionRO.revision_id.desc())
209
                    .limit(1)
210
                    .correlate(Content))
211
212
    def get_canonical_query(self) -> Query:
213
        """
214
        Return the Content/ContentRevision base query who join these table on the last revision.
215
        :return: Content/ContentRevision Query
216
        """
217
        return self._session.query(Content)\
218
            .join(ContentRevisionRO, self._get_revision_join())
219
220
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
221
    @classmethod
222
    def sort_tree_items(
223
        cls,
224
        content_list: typing.List[NodeTreeItem],
225
    )-> typing.List[NodeTreeItem]:
226
        news = []
227
        for item in content_list:
228
            news.append(item)
229
230
        content_list.sort(key=cmp_to_key(
231
            compare_tree_items_for_sorting_by_type_and_name,
232
        ))
233
234
        return content_list
235
236
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
237
    @classmethod
238
    def sort_content(
239
        cls,
240
        content_list: typing.List[Content],
241
    ) -> typing.List[Content]:
242
        content_list.sort(key=cmp_to_key(compare_content_for_sorting_by_type_and_name))
243
        return content_list
244
245
    def __real_base_query(
246
        self,
247
        workspace: Workspace = None,
248
    ) -> Query:
249
        result = self.get_canonical_query()
250
251
        # Exclude non displayable types
252
        if not self._force_show_all_types:
253
            result = result.filter(Content.type.in_(content_type_list.query_allowed_types_slugs()))
254
255
        if workspace:
256
            result = result.filter(Content.workspace_id == workspace.workspace_id)
257
258
        # Security layer: if user provided, filter
259
        # with user workspaces privileges
260
        if self._user and not self._disable_user_workspaces_filter:
261
            user = self._session.query(User).get(self._user_id)
262
            # Filter according to user workspaces
263
            workspace_ids = [r.workspace_id for r in user.roles \
264
                             if r.role>=UserRoleInWorkspace.READER]
265
            result = result.filter(or_(
266
                Content.workspace_id.in_(workspace_ids),
267
                # And allow access to non workspace document when he is owner
268
                and_(
269
                    Content.workspace_id == None,
270
                    Content.owner_id == self._user_id,
271
                )
272
            ))
273
274
        return result
275
276
    def _base_query(self, workspace: Workspace=None) -> Query:
277
        result = self.__real_base_query(workspace)
278
279
        if not self._show_active:
280
            result = result.filter(or_(
281
                Content.is_deleted==True,
282
                Content.is_archived==True,
283
            ))
284
        if not self._show_deleted:
285
            result = result.filter(Content.is_deleted==False)
286
287
        if not self._show_archived:
288
            result = result.filter(Content.is_archived==False)
289
290
        if not self._show_temporary:
291
            result = result.filter(Content.is_temporary==False)
292
293
        return result
294
295
    def __revisions_real_base_query(
296
        self,
297
        workspace: Workspace=None,
298
    ) -> Query:
299
        result = self._session.query(ContentRevisionRO)
300
301
        # Exclude non displayable types
302
        if not self._force_show_all_types:
303
            result = result.filter(Content.type.in_(self.DISPLAYABLE_CONTENTS))
304
305
        if workspace:
306
            result = result.filter(ContentRevisionRO.workspace_id==workspace.workspace_id)
307
308
        if self._user:
309
            user = self._session.query(User).get(self._user_id)
310
            # Filter according to user workspaces
311
            workspace_ids = [r.workspace_id for r in user.roles \
312
                             if r.role>=UserRoleInWorkspace.READER]
313
            result = result.filter(ContentRevisionRO.workspace_id.in_(workspace_ids))
314
315
        return result
316
317
    def _revisions_base_query(
318
        self,
319
        workspace: Workspace=None,
320
    ) -> Query:
321
        result = self.__revisions_real_base_query(workspace)
322
323
        if not self._show_deleted:
324
            result = result.filter(ContentRevisionRO.is_deleted==False)
325
326
        if not self._show_archived:
327
            result = result.filter(ContentRevisionRO.is_archived==False)
328
329
        if not self._show_temporary:
330
            result = result.filter(Content.is_temporary==False)
331
332
        return result
333
334
    def _hard_filtered_base_query(
335
        self,
336
        workspace: Workspace=None,
337
    ) -> Query:
338
        """
339
        If set to True, then filterign on is_deleted and is_archived will also
340
        filter parent properties. This is required for search() function which
341
        also search in comments (for example) which may be 'not deleted' while
342
        the associated content is deleted
343
344
        :param hard_filtering:
345
        :return:
346
        """
347
        result = self.__real_base_query(workspace)
348
349
        if not self._show_deleted:
350
            parent = aliased(Content)
351
            result = result.join(parent, Content.parent).\
352
                filter(Content.is_deleted==False).\
353
                filter(parent.is_deleted==False)
354
355
        if not self._show_archived:
356
            parent = aliased(Content)
357
            result = result.join(parent, Content.parent).\
358
                filter(Content.is_archived==False).\
359
                filter(parent.is_archived==False)
360
361
        if not self._show_temporary:
362
            parent = aliased(Content)
363
            result = result.join(parent, Content.parent). \
364
                filter(Content.is_temporary == False). \
365
                filter(parent.is_temporary == False)
366
367
        return result
368
369
    def get_base_query(
370
        self,
371
        workspace: Workspace,
372
    ) -> Query:
373
        return self._base_query(workspace)
374
375
    # TODO - G.M - 2018-07-17 - [Cleanup] Drop this method if unneeded
376
    # def get_child_folders(self, parent: Content=None, workspace: Workspace=None, filter_by_allowed_content_types: list=[], removed_item_ids: list=[], allowed_node_types=None) -> typing.List[Content]:
377
    #     """
378
    #     This method returns child items (folders or items) for left bar treeview.
379
    # 
380
    #     :param parent:
381
    #     :param workspace:
382
    #     :param filter_by_allowed_content_types:
383
    #     :param removed_item_ids:
384
    #     :param allowed_node_types: This parameter allow to hide folders for which the given type of content is not allowed.
385
    #            For example, if you want to move a Page from a folder to another, you should show only folders that accept pages
386
    #     :return:
387
    #     """
388
    #     filter_by_allowed_content_types = filter_by_allowed_content_types or []  # FDV
389
    #     removed_item_ids = removed_item_ids or []  # FDV
390
    # 
391
    #     if not allowed_node_types:
392
    #         allowed_node_types = [content_type_list.Folder.slug]
393
    #     elif allowed_node_types==content_type_list.Any_SLUG:
394
    #         allowed_node_types = ContentType.all()
395
    # 
396
    #     parent_id = parent.content_id if parent else None
397
    #     folders = self._base_query(workspace).\
398
    #         filter(Content.parent_id==parent_id).\
399
    #         filter(Content.type.in_(allowed_node_types)).\
400
    #         filter(Content.content_id.notin_(removed_item_ids)).\
401
    #         all()
402
    # 
403
    #     if not filter_by_allowed_content_types or \
404
    #                     len(filter_by_allowed_content_types)<=0:
405
    #         # Standard case for the left treeview: we want to show all contents
406
    #         # in the left treeview... so we still filter because for example
407
    #         # comments must not appear in the treeview
408
    #         return [folder for folder in folders \
409
    #                 if folder.type in ContentType.allowed_types_for_folding()]
410
    # 
411
    #     # Now this is a case of Folders only (used for moving content)
412
    #     # When moving a content, you must get only folders that allow to be filled
413
    #     # with the type of content you want to move
414
    #     result = []
415
    #     for folder in folders:
416
    #         for allowed_content_type in filter_by_allowed_content_types:
417
    # 
418
    #             is_folder = folder.type == content_type_list.Folder.slug
419
    #             content_type__allowed = folder.properties['allowed_content'][allowed_content_type] == True
420
    # 
421
    #             if is_folder and content_type__allowed:
422
    #                 result.append(folder)
423
    #                 break
424
    # 
425
    #     return result
426
427
    def _is_filename_available(
428
            self,
429
            filename: str,
430
            workspace: Workspace,
431
            parent: Content = None,
432
            exclude_content_id: int = None,
433
    ) -> bool:
434
        """
435
        Check if content label is free
436
        :param filename: content label
437
        :param workspace: workspace of the content
438
        :param parent: parent of the content
439
        :param exclude_content_id: exclude a specific content_id (useful
440
        to verify  other content for content update)
441
        :return: True if content label is available
442
        """
443
        # INFO - G.M - 2018-09-04 - Method should not be used by special content
444
        # with empty filename like comment.
445
        assert filename
446
        assert workspace
447
        label, file_extension = os.path.splitext(filename)
448
        query = self.get_base_query(workspace)
449
450
        if parent:
451
            query = query.filter(Content.parent_id == parent.content_id)
452
        else:
453
            query = query.filter(Content.parent_id == None)
454
455
        if exclude_content_id:
456
            query = query.filter(Content.content_id != exclude_content_id)
457
        query = query.filter(Content.workspace_id == workspace.workspace_id)
458
459
        nb_content_with_the_filename = query.\
460
            filter(Content.label == label).\
461
            filter(Content.file_extension == file_extension).\
462
            count()
463
        if nb_content_with_the_filename == 0:
464
            return True
465
        elif nb_content_with_the_filename == 1:
466
            return False
467
        else:
468
            critical_error_text = 'Something is wrong in the database ! '\
469
                                  'Content filename should be unique ' \
470
                                  'in a same folder in database' \
471
                                  'but you have {nb} content with ' \
472
                                  'filename {filename} ' \
473
                                  'in workspace {workspace_id}'
474
475
            critical_error_text = critical_error_text.format(
476
                nb=nb_content_with_the_filename,
477
                filename=filename,
478
                workspace_id=workspace.workspace_id,
479
            )
480
            if parent:
481
                critical_error_text = '{text} and parent as content {parent_id}'.format(  # nopep8
482
                    text=critical_error_text,
483
                    parent_id=parent.parent_id
484
                )
485
            logger.critical(self, critical_error_text)
486
            return False
487
488
    def _prepare_filename(
489
            self,
490
            label: str,
491
            file_extension: str,
492
    ) -> str:
493
        """
494
        generate correct file_name from label and file_extension
495
        :return: valid
496
        """
497
        # TODO - G.M - 2018-10-11 - Find a way to
498
        # Refactor this in order to use same method
499
        # in both contentLib and .file_name of content
500
        return '{label}{file_extension}'.format(
501
            label=label,
502
            file_extension=file_extension
503
        )
504
505
    def _is_filename_available_or_raise(
506
        self,
507
        filename: str,
508
        workspace: Workspace,
509
        parent: Content = None,
510
        exclude_content_id: int = None,
511
    ) -> bool:
512
        """
513
        Same as _is_filename_available but raise exception instead of
514
        returning boolean if content filename is already used
515
        """
516
        if self._is_filename_available(
517
            filename,
518
            workspace,
519
            parent,
520
            exclude_content_id
521
        ):
522
            return True
523
        # INFO - G.M - 2018-10-11 - prepare exception message
524
        exception_message = 'A Content already exist with the same filename ' \
525
            '{filename}  in workspace {workspace_id}'
526
        exception_message = exception_message.format(
527
            filename=filename,
528
            workspace_id=workspace.workspace_id,
529
        )
530
        if parent:
531
            exception_message = '{text} and parent as content {parent_id}'.format(
532
                text=exception_message,
533
                parent_id=parent.parent_id
534
            )
535
        raise ContentLabelAlreadyUsedHere(exception_message)
536
537
    def create(self, content_type_slug: str, workspace: Workspace, parent: Content=None, label: str = '', filename: str = '', do_save=False, is_temporary: bool=False, do_notify=True) -> Content:
538
        # TODO - G.M - 2018-07-16 - raise Exception instead of assert
539
        assert content_type_slug != content_type_list.Any_SLUG
540
        assert not (label and filename)
541
542
        if content_type_slug == FOLDER_TYPE and not label:
543
            label = self.generate_folder_label(workspace, parent)
544
545
        # TODO BS 2018-08-13: Despite that workspace is required, create_comment
546
        # can call here with None. Must update create_comment tu require the
547
        # workspace.
548
        if not workspace:
549
            workspace = parent.workspace
550
551
        content_type = content_type_list.get_one_by_slug(content_type_slug)
552
        if parent and parent.properties and 'allowed_content' in parent.properties:
553
            if content_type.slug not in parent.properties['allowed_content'] or not parent.properties['allowed_content'][content_type.slug]:
554
                raise UnallowedSubContent(' SubContent of type {subcontent_type}  not allowed in content {content_id}'.format(  # nopep8
555
                    subcontent_type=content_type.slug,
556
                    content_id=parent.content_id,
557
                ))
558
        if not workspace and parent:
559
            workspace = parent.workspace
560
561
        if workspace:
562
            if content_type.slug not in workspace.get_allowed_content_types():
563
                raise UnallowedSubContent(
564
                    ' SubContent of type {subcontent_type}  not allowed in workspace {content_id}'.format(  # nopep8
565
                        subcontent_type=content_type.slug,
566
                        content_id=workspace.workspace_id,
567
                    )
568
                )
569
        content = None
570
        if filename or label:
571
            if label:
572
                file_extension = ''
573
                if content_type.slug in (
574
                        content_type_list.Page.slug,
575
                        content_type_list.Thread.slug,
576
                ):
577
                    file_extension = '.html'
578
                filename = self._prepare_filename(label, file_extension)
579
            self._is_filename_available_or_raise(
580
                filename,
581
                workspace,
582
                parent,
583
            )
584
            content = Content()
585
            # INFO - G.M - 2018-07-04 - File_name setting automatically
586
            # set label and file_extension
587
            content.file_name = filename
588
        else:
589
            if content_type_slug == content_type_list.Comment.slug:
590
                content = Content()
591
                # INFO - G.M - 2018-07-16 - Default label for comments is
592
                # empty string.
593
                content.label = ''
594
            else:
595
                raise EmptyLabelNotAllowed(
596
                    'Content of type {} should have a valid label'.format(content_type_slug)  # nopep8
597
                )
598
599
        content.owner = self._user
600
        content.parent = parent
601
602
        content.workspace = workspace
603
        content.type = content_type.slug
604
        content.is_temporary = is_temporary
605
        content.revision_type = ActionDescription.CREATION
606
607
        if do_save:
608
            self._session.add(content)
609
            self.save(content, ActionDescription.CREATION, do_notify=do_notify)
610
        return content
611
612
    def create_comment(self, workspace: Workspace=None, parent: Content=None, content:str ='', do_save=False) -> Content:
613
        # TODO: check parent allowed_type and workspace allowed_ type
614
        assert parent and parent.type != FOLDER_TYPE
615
        if not content:
616
            raise EmptyCommentContentNotAllowed()
617
618
        item = self.create(
619
            content_type_slug=content_type_list.Comment.slug,
620
            workspace=workspace,
621
            parent=parent,
622
            do_notify=False,
623
            do_save=False,
624
            label='',
625
        )
626
        item.description = content
627
        item.revision_type = ActionDescription.COMMENT
628
629
        if do_save:
630
            self.save(item, ActionDescription.COMMENT)
631
        return item
632
633
    def get_one_from_revision(self, content_id: int, content_type: str, workspace: Workspace=None, revision_id=None) -> Content:
634
        """
635
        This method is a hack to convert a node revision item into a node
636
        :param content_id:
637
        :param content_type:
638
        :param workspace:
639
        :param revision_id:
640
        :return:
641
        """
642
643
        content = self.get_one(content_id, content_type, workspace)
644
        revision = self._session.query(ContentRevisionRO).filter(ContentRevisionRO.revision_id==revision_id).one()
645
646
        if revision.content_id==content.content_id:
647
            content.revision_to_serialize = revision.revision_id
648
        else:
649
            raise ValueError('Revision not found for given content')
650
651
        return content
652
653
    def get_one(self, content_id: int, content_type: str, workspace: Workspace=None, parent: Content=None) -> Content:
654
655
        if not content_id:
656
            return None
657
658
        base_request = self._base_query(workspace).filter(Content.content_id==content_id)
659
660
        if content_type!=content_type_list.Any_SLUG:
661
            base_request = base_request.filter(Content.type==content_type)
662
663
        if parent:
664
            base_request = base_request.filter(Content.parent_id==parent.content_id)  # nopep8
665
666
        try:
667
            content = base_request.one()
668
        except NoResultFound as exc:
669
            # TODO - G.M - 2018-07-16 - Add better support for all different
670
            # error case who can happened here
671
            # like content doesn't exist, wrong parent, wrong content_type, wrong workspace,
672
            # wrong access to this workspace, wrong base filter according
673
            # to content_status.
674
            raise ContentNotFound('Content "{}" not found in database'.format(content_id)) from exc  # nopep8
675
        return content
676
677
    def get_one_revision(self, revision_id: int = None, content: Content= None) -> ContentRevisionRO:  # nopep8
678
        """
679
        This method allow us to get directly any revision with its id
680
        :param revision_id: The content's revision's id that we want to return
681
        :param content: The content related to the revision, if None do not
682
        check if revision is related to this content.
683
        :return: An item Content linked with the correct revision
684
        """
685
        assert revision_id is not None# DYN_REMOVE
686
687
        revision = self._session.query(ContentRevisionRO).filter(ContentRevisionRO.revision_id == revision_id).one()
688
        if content and revision.content_id != content.content_id:
689
            raise RevisionDoesNotMatchThisContent(
690
                'revision {revision_id} is not a revision of content {content_id}'.format(  # nopep8
691
                    revision_id=revision.revision_id,
692
                    content_id=content.content_id,
693
                    )
694
            )
695
        return revision
696
697
    # INFO - A.P - 2017-07-03 - python file object getter
698
    # in case of we cook a version of preview manager that allows a pythonic
699
    # access to files
700
    # def get_one_revision_file(self, revision_id: int = None):
701
    #     """
702
    #     This function allows us to directly get a Python file object from its
703
    #     revision identifier.
704
    #     :param revision_id: The revision id of the file we want to return
705
    #     :return: The corresponding Python file object
706
    #     """
707
    #     revision = self.get_one_revision(revision_id)
708
    #     return DepotManager.get().get(revision.depot_file)
709
710
    def get_one_revision_filepath(self, revision_id: int = None) -> str:
711
        """
712
        This method allows us to directly get a file path from its revision
713
        identifier.
714
        :param revision_id: The revision id of the filepath we want to return
715
        :return: The corresponding filepath
716
        """
717
        revision = self.get_one_revision(revision_id)
718
        depot = DepotManager.get()
719
        depot_stored_file = depot.get(revision.depot_file)  # type: StoredFile
720
        depot_file_path = depot_stored_file._file_path  # type: str
721
        return depot_file_path
722
723
    # TODO - G.M - 2018-09-04 - [Cleanup] Is this method already needed ?
724
    def get_one_by_label_and_parent(
725
            self,
726
            content_label: str,
727
            content_parent: Content=None,
728
    ) -> Content:
729
        """
730
        This method let us request the database to obtain a Content with its name and parent
731
        :param content_label: Either the content's label or the content's filename if the label is None
732
        :param content_parent: The parent's content
733
        :param workspace: The workspace's content
734
        :return The corresponding Content
735
        """
736
        workspace = content_parent.workspace if content_parent else None
737
        query = self._base_query(workspace)
738
        parent_id = content_parent.content_id if content_parent else None
739
        query = query.filter(Content.parent_id == parent_id)
740
741
        file_name, file_extension = os.path.splitext(content_label)
742
743
        # TODO - G.M - 2018-09-04 - If this method is needed, it should be
744
        # rewritten in order to avoid content_type hardcoded code there
745
        return query.filter(
746
            or_(
747
                and_(
748
                    Content.type == content_type_list.File.slug,
749
                    Content.label == file_name,
750
                    Content.file_extension == file_extension,
751
                ),
752
                and_(
753
                    Content.type == content_type_list.Thread.slug,
754
                    Content.label == file_name,
755
                ),
756
                and_(
757
                    Content.type == content_type_list.Page.slug,
758
                    Content.label == file_name,
759
                ),
760
                and_(
761
                    Content.type == content_type_list.Folder.slug,
762
                    Content.label == content_label,
763
                ),
764
            )
765
        ).one()
766
767
    # TODO - G.M - 2018-09-04 - [Cleanup] Is this method already needed ?
768
    def get_one_by_label_and_parent_labels(
769
            self,
770
            content_label: str,
771
            workspace: Workspace,
772
            content_parent_labels: [str]=None,
773
    ):
774
        """
775
        Return content with it's label, workspace and parents labels (optional)
776
        :param content_label: label of content (label or file_name)
777
        :param workspace: workspace containing all of this
778
        :param content_parent_labels: Ordered list of labels representing path
779
            of folder (without workspace label).
780
        E.g.: ['foo', 'bar'] for complete path /Workspace1/foo/bar folder
781
        :return: Found Content
782
        """
783
        query = self._base_query(workspace)
784
        parent_folder = None
785
786
        # Grab content parent folder if parent path given
787
        if content_parent_labels:
788
            parent_folder = self.get_folder_with_workspace_path_labels(
789
                content_parent_labels,
790
                workspace,
791
            )
792
793
        # Build query for found content by label
794
        content_query = self.filter_query_for_content_label_as_path(
795
            query=query,
796
            content_label_as_file=content_label,
797
        )
798
799
        # Modify query to apply parent folder filter if any
800
        if parent_folder:
801
            content_query = content_query.filter(
802
                Content.parent_id == parent_folder.content_id,
803
            )
804
        else:
805
            content_query = content_query.filter(
806
                Content.parent_id == None,
807
            )
808
809
        # Filter with workspace
810
        content_query = content_query.filter(
811
            Content.workspace_id == workspace.workspace_id,
812
        )
813
814
        # Return the content
815
        return content_query\
816
            .order_by(
817
                Content.revision_id.desc(),
818
            )\
819
            .one()
820
821
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
822
    def get_folder_with_workspace_path_labels(
823
            self,
824
            path_labels: [str],
825
            workspace: Workspace,
826
    ) -> Content:
827
        """
828
        Return a Content folder for given relative path.
829
        TODO BS 20161124: Not safe if web interface allow folder duplicate names
830
        :param path_labels: List of labels representing path of folder
831
        (without workspace label).
832
        E.g.: ['foo', 'bar'] for complete path /Workspace1/foo/bar folder
833
        :param workspace: workspace of folders
834
        :return: Content folder
835
        """
836
        query = self._base_query(workspace)
837
        folder = None
838
839
        for label in path_labels:
840
            # Filter query on label
841
            folder_query = query \
842
                .filter(
843
                Content.type == content_type_list.Folder.slug,
844
                Content.label == label,
845
                Content.workspace_id == workspace.workspace_id,
846
                )
847
848
            # Search into parent folder (if already deep)
849
            if folder:
850
                folder_query = folder_query\
851
                    .filter(
852
                        Content.parent_id == folder.content_id,
853
                    )
854
            else:
855
                folder_query = folder_query \
856
                    .filter(Content.parent_id == None)
857
858
            # Get thirst corresponding folder
859
            folder = folder_query \
860
                .order_by(Content.revision_id.desc()) \
861
                .one()
862
863
        return folder
864
865
    # TODO - G.M - 2018-09-04 - [Cleanup] Is this method already needed ?
866
    def filter_query_for_content_label_as_path(
867
            self,
868
            query: Query,
869
            content_label_as_file: str,
870
            is_case_sensitive: bool = False,
871
    ) -> Query:
872
        """
873
        Apply normalised filters to found Content corresponding as given label.
874
        :param query: query to modify
875
        :param content_label_as_file: label in this
876
        FILE version, use Content.get_label_as_file().
877
        :param is_case_sensitive: Take care about case or not
878
        :return: modified query
879
        """
880
        file_name, file_extension = os.path.splitext(content_label_as_file)
881
882
        label_filter = Content.label == content_label_as_file
883
        file_name_filter = Content.label == file_name
884
        file_extension_filter = Content.file_extension == file_extension
885
886
        if not is_case_sensitive:
887
            label_filter = func.lower(Content.label) == \
888
                           func.lower(content_label_as_file)
889
            file_name_filter = func.lower(Content.label) == \
890
                               func.lower(file_name)
891
            file_extension_filter = func.lower(Content.file_extension) == \
892
                                    func.lower(file_extension)
893
894
        # TODO - G.M - 2018-09-04 - If this method is needed, it should be
895
        # rewritten in order to avoid content_type hardcoded code there
896
        return query.filter(or_(
897
            and_(
898
                Content.type == content_type_list.File.slug,
899
                file_name_filter,
900
                file_extension_filter,
901
            ),
902
            and_(
903
                Content.type == content_type_list.Thread.slug,
904
                file_name_filter,
905
                file_extension_filter,
906
            ),
907
            and_(
908
                Content.type == content_type_list.Page.slug,
909
                file_name_filter,
910
                file_extension_filter,
911
            ),
912
            and_(
913
                Content.type == content_type_list.Folder.slug,
914
                label_filter,
915
            ),
916
        ))
917
918
    def get_pdf_preview_path(
919
            self,
920
            content_id: int,
921
            revision_id: int,
922
            page_number: int
923
    ) -> str:
924
        """
925
        Get pdf preview of revision of content
926
        :param content_id: id of content
927
        :param revision_id: id of content revision
928
        :param page_number: page number of the preview, useful for multipage content
929
        :return: preview_path as string
930
        """
931
        file_path = self.get_one_revision_filepath(revision_id)
932
        try:
933
            page_number = preview_manager_page_format(page_number)
934
            if page_number >= self.preview_manager.get_page_nb(file_path):
935
                raise PageOfPreviewNotFound(
936
                    'page_number {page_number} of content {content_id} does not exist'.format(
937
                        page_number=page_number,
938
                        content_id=content_id
939
                    ),
940
                )
941
            pdf_preview_path = self.preview_manager.get_pdf_preview(
942
                file_path,
943
                page=page_number
944
            )
945
        except UnavailablePreviewType as exc:
946
            raise TracimUnavailablePreviewType() from exc
947
        except UnsupportedMimeType as exc:
948
            raise UnavailablePreview(
949
                'No preview available for content {}, revision {}'.format(content_id, revision_id) # nopep8
950
            ) from exc
951
        return pdf_preview_path
952
953
    def get_full_pdf_preview_path(self, revision_id: int) -> str:
954
        """
955
        Get full(multiple page) pdf preview of revision of content
956
        :param revision_id: id of revision
957
        :return: path of the full pdf preview of this revision
958
        """
959
        file_path = self.get_one_revision_filepath(revision_id)
960
        try:
961
            pdf_preview_path = self.preview_manager.get_pdf_preview(file_path)
962
        except UnavailablePreviewType as exc:
963
            raise TracimUnavailablePreviewType() from exc
964
        except UnsupportedMimeType as exc:
965
            raise UnavailablePreview(
966
                'No preview available for revision {}'.format(revision_id)
967
            ) from exc
968
        return pdf_preview_path
969
970
    def get_jpg_preview_allowed_dim(self) -> PreviewAllowedDim:
971
        """
972
        Get jpg preview allowed dimensions and strict bool param.
973
        """
974
        return PreviewAllowedDim(
975
            self._config.PREVIEW_JPG_RESTRICTED_DIMS,
976
            self._config.PREVIEW_JPG_ALLOWED_DIMS,
977
        )
978
979
    def get_jpg_preview_path(
980
        self,
981
        content_id: int,
982
        revision_id: int,
983
        page_number: int,
984
        width: int = None,
985
        height: int = None,
986
    ) -> str:
987
        """
988
        Get jpg preview of revision of content
989
        :param content_id: id of content
990
        :param revision_id: id of content revision
991
        :param page_number: page number of the preview, useful for multipage content
992
        :param width: width in pixel
993
        :param height: height in pixel
994
        :return: preview_path as string
995
        """
996
        file_path = self.get_one_revision_filepath(revision_id)
997
        try:
998
            page_number = preview_manager_page_format(page_number)
999
            if page_number >= self.preview_manager.get_page_nb(file_path):
1000
                raise PageOfPreviewNotFound(
1001
                    'page {page_number} of revision {revision_id} of content {content_id} does not exist'.format(  # nopep8
1002
                        page_number=page_number,
1003
                        revision_id=revision_id,
1004
                        content_id=content_id,
1005
                    ),
1006
                )
1007
            if not width and not height:
1008
                width = self._config.PREVIEW_JPG_ALLOWED_DIMS[0].width
1009
                height = self._config.PREVIEW_JPG_ALLOWED_DIMS[0].height
1010
1011
            allowed_dim = False
1012
            for preview_dim in self._config.PREVIEW_JPG_ALLOWED_DIMS:
1013
                if width == preview_dim.width and height == preview_dim.height:
1014
                    allowed_dim = True
1015
                    break
1016
1017
            if not allowed_dim and self._config.PREVIEW_JPG_RESTRICTED_DIMS:
1018
                raise PreviewDimNotAllowed(
1019
                    'Size {width}x{height} is not allowed for jpeg preview'.format(
1020
                        width=width,
1021
                        height=height,
1022
                    )
1023
                )
1024
            jpg_preview_path = self.preview_manager.get_jpeg_preview(
1025
                file_path,
1026
                page=page_number,
1027
                width=width,
1028
                height=height,
1029
            )
1030
        except UnsupportedMimeType as exc:
1031
            raise UnavailablePreview(
1032
                'No preview available for content {}, revision {}'.format(content_id, revision_id)  # nopep8
1033
            ) from exc
1034
        return jpg_preview_path
1035
1036
    def _get_all_query(
1037
        self,
1038
        parent_id: int = None,
1039
        content_type_slug: str = content_type_list.Any_SLUG,
1040
        workspace: Workspace = None,
1041
        label:str = None,
1042
        order_by_properties: typing.Optional[typing.List[typing.Union[str, QueryableAttribute]]] = None,  # nopep8
1043
    ) -> Query:
1044
        """
1045
        Extended filter for better "get all data" query
1046
        :param parent_id: filter by parent_id
1047
        :param content_type_slug: filter by content_type slug
1048
        :param workspace: filter by workspace
1049
        :param order_by_properties: filter by properties can be both string of
1050
        attribute or attribute of Model object from sqlalchemy(preferred way,
1051
        QueryableAttribute object)
1052
        :return: Query object
1053
        """
1054
        order_by_properties = order_by_properties or []  # FDV
1055
        assert parent_id is None or isinstance(parent_id, int)
1056
        assert content_type_slug is not None
1057
        resultset = self._base_query(workspace)
1058
1059
        if content_type_slug != content_type_list.Any_SLUG:
1060
            # INFO - G.M - 2018-07-05 - convert with
1061
            #  content type object to support legacy slug
1062
            content_type_object = content_type_list.get_one_by_slug(content_type_slug)
1063
            all_slug_alias = [content_type_object.slug]
1064
            if content_type_object.slug_alias:
1065
                all_slug_alias.extend(content_type_object.slug_alias)
1066
            resultset = resultset.filter(Content.type.in_(all_slug_alias))
1067
1068
        if parent_id:
1069
            resultset = resultset.filter(Content.parent_id==parent_id)
1070
        if parent_id == 0 or parent_id is False:
1071
            resultset = resultset.filter(Content.parent_id == None)
1072
        if label:
1073
            resultset = resultset.filter(Content.label.ilike('%{}%'.format(label)))  # nopep8
1074
1075
        for _property in order_by_properties:
1076
            resultset = resultset.order_by(_property)
1077
1078
        return resultset
1079
1080
    def get_all(
1081
            self,
1082
            parent_id: int=None,
1083
            content_type: str=content_type_list.Any_SLUG,
1084
            workspace: Workspace=None,
1085
            label: str=None,
1086
            order_by_properties: typing.Optional[typing.List[typing.Union[str, QueryableAttribute]]] = None,  # nopep8
1087
    ) -> typing.List[Content]:
1088
        """
1089
        Return all content using some filters
1090
        :param parent_id: filter by parent_id
1091
        :param content_type: filter by content_type slug
1092
        :param workspace: filter by workspace
1093
        :param order_by_properties: filter by properties can be both string of
1094
        attribute or attribute of Model object from sqlalchemy(preferred way,
1095
        QueryableAttribute object)
1096
        :return: List of contents
1097
        """
1098
        order_by_properties = order_by_properties or []  # FDV
1099
        return self._get_all_query(parent_id, content_type, workspace, label, order_by_properties).all()
1100
1101
    # TODO - G.M - 2018-07-17 - [Cleanup] Drop this method if unneeded
1102
    # def get_children(self, parent_id: int, content_types: list, workspace: Workspace=None) -> typing.List[Content]:
1103
    #     """
1104
    #     Return parent_id childs of given content_types
1105
    #     :param parent_id: parent id
1106
    #     :param content_types: list of types
1107
    #     :param workspace: workspace filter
1108
    #     :return: list of content
1109
    #     """
1110
    #     resultset = self._base_query(workspace)
1111
    #     resultset = resultset.filter(Content.type.in_(content_types))
1112
    #
1113
    #     if parent_id:
1114
    #         resultset = resultset.filter(Content.parent_id==parent_id)
1115
    #     if parent_id is False:
1116
    #         resultset = resultset.filter(Content.parent_id == None)
1117
    #
1118
    #     return resultset.all()
1119
1120
    # TODO - G.M - 2018-07-17 - [Cleanup] Drop this method if unneeded
1121
    # TODO find an other name to filter on is_deleted / is_archived
1122
    def get_all_with_filter(self, parent_id: int=None, content_type: str=content_type_list.Any_SLUG, workspace: Workspace=None) -> typing.List[Content]:
1123
        assert parent_id is None or isinstance(parent_id, int) # DYN_REMOVE
1124
        assert content_type is not None# DYN_REMOVE
1125
        assert isinstance(content_type, str) # DYN_REMOVE
1126
1127
        resultset = self._base_query(workspace)
1128
1129
        if content_type != content_type_list.Any_SLUG:
1130
            resultset = resultset.filter(Content.type==content_type)
1131
1132
        resultset = resultset.filter(Content.is_deleted == self._show_deleted)
1133
        resultset = resultset.filter(Content.is_archived == self._show_archived)
1134
        resultset = resultset.filter(Content.is_temporary == self._show_temporary)
1135
1136
        resultset = resultset.filter(Content.parent_id==parent_id)
1137
1138
        return resultset.all()
1139
1140
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1141
    def get_all_without_exception(self, content_type: str, workspace: Workspace=None) -> typing.List[Content]:
1142
        assert content_type is not None# DYN_REMOVE
1143
1144
        resultset = self._base_query(workspace)
1145
1146
        if content_type != content_type_list.Any_SLUG:
1147
            resultset = resultset.filter(Content.type==content_type)
1148
1149
        return resultset.all()
1150
1151
    def get_last_unread(self, parent_id: int, content_type: str,
1152
                        workspace: Workspace=None, limit=10) -> typing.List[Content]:
1153
        assert parent_id is None or isinstance(parent_id, int) # DYN_REMOVE
1154
        assert content_type is not None# DYN_REMOVE
1155
        assert isinstance(content_type, str) # DYN_REMOVE
1156
1157
        read_revision_ids = self._session.query(RevisionReadStatus.revision_id) \
1158
            .filter(RevisionReadStatus.user_id==self._user_id)
1159
1160
        not_read_revisions = self._revisions_base_query(workspace) \
1161
            .filter(~ContentRevisionRO.revision_id.in_(read_revision_ids)) \
1162
            .filter(ContentRevisionRO.workspace_id == Workspace.workspace_id) \
1163
            .filter(Workspace.is_deleted.is_(False)) \
1164
            .subquery()
1165
1166
    # TODO - G.M - 2018-07-17 - [Cleanup] Drop this method if unneeded
1167
    # def get_all_without_exception(self, content_type: str, workspace: Workspace=None) -> typing.List[Content]:
1168
    #     assert content_type is not None# DYN_REMOVE
1169
    #
1170
    #     resultset = self._base_query(workspace)
1171
    #
1172
    #     if content_type != content_type_list.Any_SLUG:
1173
    #         resultset = resultset.filter(Content.type==content_type)
1174
    #
1175
    #     return resultset.all()
1176
1177
    def get_last_active(
1178
            self,
1179
            workspace: Workspace=None,
1180
            limit: typing.Optional[int]=None,
1181
            before_content: typing.Optional[Content]= None,
1182
            content_ids: typing.Optional[typing.List[int]] = None,
1183
    ) -> typing.List[Content]:
1184
        """
1185
        get contents list sorted by last update
1186
        (last modification of content itself or one of this comment)
1187
        :param workspace: Workspace to check
1188
        :param limit: maximum number of elements to return
1189
        :param before_content: last_active content are only those updated
1190
         before this content given.
1191
        :param content_ids: restrict selection to some content ids and
1192
        related Comments
1193
        :return: list of content
1194
        """
1195
1196
        resultset = self._get_all_query(
1197
            workspace=workspace,
1198
        )
1199
        if content_ids:
1200
            resultset = resultset.filter(
1201
                or_(
1202
                    Content.content_id.in_(content_ids),
1203
                    and_(
1204
                        Content.parent_id.in_(content_ids),
1205
                        Content.type == content_type_list.Comment.slug
1206
                    )
1207
                )
1208
            )
1209
1210
        resultset = resultset.order_by(desc(Content.updated))
1211
1212
        active_contents = []
1213
        too_recent_content = []
1214
        before_content_find = False
1215
        for content in resultset:
1216
            related_active_content = None
1217
            if content_type_list.Comment.slug == content.type:
1218
                related_active_content = content.parent
1219
            else:
1220
                related_active_content = content
1221
1222
            # INFO - G.M - 2018-08-10 - re-apply general filters here to avoid
1223
            # issue with comments
1224
            if not self._show_deleted and related_active_content.is_deleted:
1225
                continue
1226
            if not self._show_archived and related_active_content.is_archived:
1227
                continue
1228
1229
            if related_active_content not in active_contents and related_active_content not in too_recent_content:  # nopep8
1230
1231
                if not before_content or before_content_find:
1232
                    active_contents.append(related_active_content)
1233
                else:
1234
                    too_recent_content.append(related_active_content)
1235
1236
                if before_content and related_active_content == before_content:
1237
                    before_content_find = True
1238
1239
            if limit and len(active_contents) >= limit:
1240
                break
1241
1242
        return active_contents
1243
1244
    # TODO - G.M - 2018-07-19 - Find a way to update this method to something
1245
    # usable and efficient for tracim v2 to get content with read/unread status
1246
    # instead of relying on has_new_information_for()
1247
    # def get_last_unread(self, parent_id: typing.Optional[int], content_type: str,
1248
    #                     workspace: Workspace=None, limit=10) -> typing.List[Content]:
1249
    #     assert parent_id is None or isinstance(parent_id, int) # DYN_REMOVE
1250
    #     assert content_type is not None# DYN_REMOVE
1251
    #     assert isinstance(content_type, str) # DYN_REMOVE
1252
    #
1253
    #     read_revision_ids = self._session.query(RevisionReadStatus.revision_id) \
1254
    #         .filter(RevisionReadStatus.user_id==self._user_id)
1255
    #
1256
    #     not_read_revisions = self._revisions_base_query(workspace) \
1257
    #         .filter(~ContentRevisionRO.revision_id.in_(read_revision_ids)) \
1258
    #         .filter(ContentRevisionRO.workspace_id == Workspace.workspace_id) \
1259
    #         .filter(Workspace.is_deleted.is_(False)) \
1260
    #         .subquery()
1261
    #
1262
    #     not_read_content_ids_query = self._session.query(
1263
    #         distinct(not_read_revisions.c.content_id)
1264
    #     )
1265
    #     not_read_content_ids = list(map(
1266
    #         itemgetter(0),
1267
    #         not_read_content_ids_query,
1268
    #     ))
1269
    #
1270
    #     not_read_contents = self._base_query(workspace) \
1271
    #         .filter(Content.content_id.in_(not_read_content_ids)) \
1272
    #         .order_by(desc(Content.updated))
1273
    #
1274
    #     if content_type != content_type_list.Any_SLUG:
1275
    #         not_read_contents = not_read_contents.filter(
1276
    #             Content.type==content_type)
1277
    #     else:
1278
    #         not_read_contents = not_read_contents.filter(
1279
    #             Content.type!=content_type_list.Folder.slug)
1280
    #
1281
    #     if parent_id:
1282
    #         not_read_contents = not_read_contents.filter(
1283
    #             Content.parent_id==parent_id)
1284
    #
1285
    #     result = []
1286
    #     for item in not_read_contents:
1287
    #         new_item = None
1288
    #         if content_type_list.Comment.slug == item.type:
1289
    #             new_item = item.parent
1290
    #         else:
1291
    #             new_item = item
1292
    #
1293
    #         # INFO - D.A. - 2015-05-20
1294
    #         # We do not want to show only one item if the last 10 items are
1295
    #         # comments about one thread for example
1296
    #         if new_item not in result:
1297
    #             result.append(new_item)
1298
    #
1299
    #         if len(result) >= limit:
1300
    #             break
1301
    #
1302
    #     return result
1303
1304
    def _set_allowed_content(self, content: Content, allowed_content_dict: dict) -> None:  # nopep8
1305
        """
1306
        :param content: the given content instance
1307
        :param allowed_content_dict: must be something like this:
1308
            dict(
1309
                folder = True
1310
                thread = True,
1311
                file = False,
1312
                page = True
1313
            )
1314
        :return: nothing
1315
        """
1316
        properties = content.properties.copy()
1317
        properties['allowed_content'] = allowed_content_dict
1318
        content.properties = properties
1319
1320
    def set_allowed_content(self, content: Content, allowed_content_type_slug_list: typing.List[str]) -> None:  # nopep8
1321
        """
1322
        :param content: the given content instance
1323
        :param allowed_content_type_slug_list: list of content_type_slug to
1324
        accept as subcontent.
1325
        :return: nothing
1326
        """
1327
        allowed_content_dict = {}
1328
        for allowed_content_type_slug in allowed_content_type_slug_list:
1329
            if allowed_content_type_slug not in content_type_list.endpoint_allowed_types_slug():
1330
                raise ContentTypeNotExist('Content_type {} does not exist'.format(allowed_content_type_slug))  # nopep8
1331
            allowed_content_dict[allowed_content_type_slug] = True
1332
1333
        self._set_allowed_content(content, allowed_content_dict)
1334
1335
    def restore_content_default_allowed_content(self, content: Content) -> None:
1336
        """
1337
        Return to default allowed_content_types
1338
        :param content: the given content instance
1339
        :return: nothing
1340
        """
1341
        if content._properties and 'allowed_content' in content._properties:
1342
            properties = content.properties.copy()
1343
            del properties['allowed_content']
1344
            content.properties = properties
1345
1346
    def set_status(self, content: Content, new_status: str):
1347
        if new_status in content_status_list.get_all_slugs_values():
1348
            content.status = new_status
1349
            content.revision_type = ActionDescription.STATUS_UPDATE
1350
        else:
1351
            raise ValueError('The given value {} is not allowed'.format(new_status))
1352
1353
    def move(self,
1354
             item: Content,
1355
             new_parent: Content,
1356
             must_stay_in_same_workspace: bool=True,
1357
             new_workspace: Workspace=None,
1358
    ):
1359
        if must_stay_in_same_workspace:
1360
            if new_parent and new_parent.workspace_id != item.workspace_id:
1361
                raise ValueError('the item should stay in the same workspace')
1362
1363
        item.parent = new_parent
1364
        if new_workspace:
1365
            item.workspace = new_workspace
1366
            if new_parent and \
1367
                    new_parent.workspace_id != new_workspace.workspace_id:
1368
                raise WorkspacesDoNotMatch(
1369
                    'new parent workspace and new workspace should be the same.'
1370
                )
1371
        else:
1372
            if new_parent:
1373
                item.workspace = new_parent.workspace
1374
1375
        self._is_filename_available_or_raise(
1376
            item.file_name,
1377
            item.workspace,
1378
            item.parent,
1379
            exclude_content_id=item.content_id
1380
        )
1381
        item.revision_type = ActionDescription.MOVE
1382
1383
    def copy(
1384
        self,
1385
        item: Content,
1386
        new_parent: Content=None,
1387
        new_label: str=None,
1388
        do_save: bool=True,
1389
        do_notify: bool=True,
1390
    ) -> Content:
1391
        """
1392
        Copy nearly all content, revision included. Children not included, see
1393
        "copy_children" for this.
1394
        :param item: Item to copy
1395
        :param new_parent: new parent of the new copied item
1396
        :param new_label: new label of the new copied item
1397
        :param do_notify: notify copy or not
1398
        :return: Newly copied item
1399
        """
1400
        if (not new_parent and not new_label) or (new_parent == item.parent and new_label == item.label):  # nopep8
1401
            # TODO - G.M - 08-03-2018 - Use something else than value error
1402
            raise ValueError("You can't copy file into itself")
1403
        if new_parent:
1404
            workspace = new_parent.workspace
1405
            parent = new_parent
1406
        else:
1407
            workspace = item.workspace
1408
            parent = item.parent
1409
        label = new_label or item.label
1410
        filename = self._prepare_filename(label, item.file_extension)
1411
        self._is_filename_available_or_raise(filename, workspace, parent)
1412
        content = item.copy(parent)
1413
        # INFO - GM - 15-03-2018 - add "copy" revision
1414
        with new_revision(
1415
            session=self._session,
1416
            tm=transaction.manager,
1417
            content=content,
1418
            force_create_new_revision=True
1419
        ) as rev:
1420
            rev.parent = parent
1421
            rev.workspace = workspace
1422
            rev.file_name = filename
1423
            rev.revision_type = ActionDescription.COPY
1424
            rev.properties['origin'] = {
1425
                'content': item.id,
1426
                'revision': item.last_revision.revision_id,
1427
            }
1428
        if do_save:
1429
            self.save(content, ActionDescription.COPY, do_notify=do_notify)
1430
        return content
1431
1432
    def copy_children(self, origin_content: Content, new_content: Content):
1433
        for child in origin_content.children:
1434
            self.copy(child, new_content)
1435
1436
    def move_recursively(self, item: Content,
1437
                         new_parent: Content, new_workspace: Workspace):
1438
        self.move(item, new_parent, False, new_workspace)
1439
        self.save(item, do_notify=False)
1440
1441
        for child in item.children:
1442
            with new_revision(
1443
                session=self._session,
1444
                tm=transaction.manager,
1445
                content=child
1446
            ):
1447
                self.move_recursively(child, item, new_workspace)
1448
        return
1449
1450
    def is_editable(self, item: Content) -> bool:
1451
        return not item.is_readonly \
1452
               and item.is_active \
1453
               and item.get_status().is_editable()
1454
1455
    def update_content(self, item: Content, new_label: str, new_content: str=None) -> Content:
1456
        if not self.is_editable(item):
1457
            raise ContentInNotEditableState("Can't update not editable file, you need to change his status or state (deleted/archived) before any change.")  # nopep8
1458
        if item.label == new_label and item.description == new_content:
1459
            # TODO - G.M - 20-03-2018 - Fix internatization for webdav access.
1460
            # Internatization disabled in libcontent for now.
1461
            raise SameValueError('The content did not changed')
1462
        if not new_label:
1463
            raise EmptyLabelNotAllowed()
1464
1465
        label = new_label or item.label
1466
        filename = self._prepare_filename(label, item.file_extension)
1467
        self._is_filename_available_or_raise(
1468
            filename,
1469
            item.workspace,
1470
            item.parent,
1471
            exclude_content_id=item.content_id
1472
        )
1473
1474
        item.owner = self._user
1475
        item.label = new_label
1476
        item.description = new_content if new_content else item.description # TODO: convert urls into links
1477
        item.revision_type = ActionDescription.EDITION
1478
        return item
1479
1480
    def update_file_data(self, item: Content, new_filename: str, new_mimetype: str, new_content: bytes) -> Content:
1481
        if not self.is_editable(item):
1482
            raise ContentInNotEditableState("Can't update not editable file, you need to change his status or state (deleted/archived) before any change.")  # nopep8
1483
        # FIXME - G.M - 2018-09-25 - Repair and do a better same content check,
1484
        # as pyramid behaviour use buffered object
1485
        # new_content == item.depot_file.file.read() case cannot happened using
1486
        # whenever new_content.read() == item.depot_file.file.read().
1487
        # as this behaviour can create struggle with big file, simple solution
1488
        # using read can be used everytime.
1489
        # if new_mimetype == item.file_mimetype and \
1490
        #         new_content == item.depot_file.file.read():
1491
        #     raise SameValueError('The content did not changed')
1492
        item.owner = self._user
1493
        self._is_filename_available_or_raise(
1494
            new_filename,
1495
            item.workspace,
1496
            item.parent,
1497
            exclude_content_id=item.content_id
1498
        )
1499
        item.file_name = new_filename
1500
        item.file_mimetype = new_mimetype
1501
        item.depot_file = FileIntent(
1502
            new_content,
1503
            new_filename,
1504
            new_mimetype,
1505
        )
1506
        item.revision_type = ActionDescription.REVISION
1507
        return item
1508
1509
    def archive(self, content: Content):
1510
        content.owner = self._user
1511
        content.is_archived = True
1512
        # TODO - G.M - 12-03-2018 - Inspect possible label conflict problem
1513
        # INFO - G.M - 12-03-2018 - Set label name to avoid trouble when
1514
        # un-archiving file.
1515
        label = '{label}-{action}-{date}'.format(
1516
            label=content.label,
1517
            action='archived',
1518
            date=current_date_for_filename()
1519
        )
1520
        filename = self._prepare_filename(label, content.file_extension)
1521
        self._is_filename_available_or_raise(
1522
            filename,
1523
            content.workspace,
1524
            content.parent,
1525
            exclude_content_id=content.content_id
1526
        )
1527
        content.file_name = filename
1528
        content.revision_type = ActionDescription.ARCHIVING
1529
1530
    def unarchive(self, content: Content):
1531
        content.owner = self._user
1532
        content.is_archived = False
1533
        content.revision_type = ActionDescription.UNARCHIVING
1534
1535
    def delete(self, content: Content):
1536
        content.owner = self._user
1537
        content.is_deleted = True
1538
        # TODO - G.M - 12-03-2018 - Inspect possible label conflict problem
1539
        # INFO - G.M - 12-03-2018 - Set label name to avoid trouble when
1540
        # un-deleting file.
1541
        label = '{label}-{action}-{date}'.format(
1542
            label=content.label,
1543
            action='deleted',
1544
            date=current_date_for_filename()
1545
        )
1546
        filename = self._prepare_filename(label, content.file_extension)
1547
        self._is_filename_available_or_raise(
1548
            filename,
1549
            content.workspace,
1550
            content.parent,
1551
            exclude_content_id=content.content_id
1552
        )
1553
        content.file_name = filename
1554
        content.revision_type = ActionDescription.DELETION
1555
1556
    def undelete(self, content: Content):
1557
        content.owner = self._user
1558
        content.is_deleted = False
1559
        content.revision_type = ActionDescription.UNDELETION
1560
1561
    def get_preview_page_nb(self, revision_id: int) -> typing.Optional[int]:
1562
        file_path = self.get_one_revision_filepath(revision_id)
1563
        try:
1564
            nb_pages = self.preview_manager.get_page_nb(file_path)
1565
        except UnsupportedMimeType:
1566
            return None
1567
        return nb_pages
1568
1569
    def has_pdf_preview(self, revision_id: int) -> bool:
1570
        file_path = self.get_one_revision_filepath(revision_id)
1571
        try:
1572
            has_pdf_preview = self.preview_manager.has_pdf_preview(file_path)
1573
        except UnsupportedMimeType:
1574
            has_pdf_preview = False
1575
        return has_pdf_preview
1576
1577
    def mark_read__all(
1578
            self,
1579
            read_datetime: datetime=None,
1580
            do_flush: bool=True,
1581
            recursive: bool=True
1582
    ) -> None:
1583
        """
1584
        Read content of all workspace visible for the user.
1585
        :param read_datetime: date of readigin
1586
        :param do_flush: flush database
1587
        :param recursive: mark read subcontent too
1588
        :return: nothing
1589
        """
1590
        return self.mark_read__workspace(None, read_datetime, do_flush, recursive) # nopep8
1591
1592
    def mark_read__workspace(self,
1593
                       workspace : Workspace,
1594
                       read_datetime: datetime=None,
1595
                       do_flush: bool=True,
1596
                       recursive: bool=True
1597
    ) -> None:
1598
        """
1599
        Read content of a workspace visible for the user.
1600
        :param read_datetime: date of readigin
1601
        :param do_flush: flush database
1602
        :param recursive: mark read subcontent too
1603
        :return: nothing
1604
        """
1605
        itemset = self.get_last_active(workspace)
1606
        for item in itemset:
1607
            if item.has_new_information_for(self._user):
1608
                self.mark_read(item, read_datetime, do_flush, recursive)
1609
1610
    def mark_read(
1611
            self,
1612
            content: Content,
1613
            read_datetime: datetime=None,
1614
            do_flush: bool=True,
1615
            recursive: bool=True
1616
    ) -> Content:
1617
        """
1618
        Read content for the user.
1619
        :param read_datetime: date of readigin
1620
        :param do_flush: flush database
1621
        :param recursive: mark read subcontent too
1622
        :return: nothing
1623
        """
1624
        assert self._user
1625
        assert content
1626
1627
        # The algorithm is:
1628
        # 1. define the read datetime
1629
        # 2. update all revisions related to current Content
1630
        # 3. do the same for all child revisions
1631
        #    (ie parent_id is content_id of current content)
1632
1633
        if not read_datetime:
1634
            read_datetime = datetime.datetime.now()
1635
1636
        viewed_revisions = self._session.query(ContentRevisionRO) \
1637
            .filter(ContentRevisionRO.content_id==content.content_id).all()
1638
1639
        for revision in viewed_revisions:
1640
            revision.read_by[self._user] = read_datetime
1641
1642
        if recursive:
1643
            # mark read :
1644
            # - all children
1645
            # - parent stuff (if you mark a comment as read,
1646
            #                 then you have seen the parent)
1647
            # - parent comments
1648
            for child in content.get_valid_children():
1649
                self.mark_read(child, read_datetime=read_datetime,
1650
                               do_flush=False)
1651
1652
            if content_type_list.Comment.slug == content.type:
1653
                self.mark_read(content.parent, read_datetime=read_datetime,
1654
                               do_flush=False, recursive=False)
1655
                for comment in content.parent.get_comments():
1656
                    if comment != content:
1657
                        self.mark_read(comment, read_datetime=read_datetime,
1658
                                       do_flush=False, recursive=False)
1659
1660
        if do_flush:
1661
            self.flush()
1662
1663
        return content
1664
1665
    def mark_unread(self, content: Content, do_flush=True) -> Content:
1666
        assert self._user
1667
        assert content
1668
1669
        revisions = self._session.query(ContentRevisionRO) \
1670
            .filter(ContentRevisionRO.content_id==content.content_id).all()
1671
1672
        for revision in revisions:
1673
            try:
1674
                del revision.read_by[self._user]
1675
            except KeyError:
1676
                pass
1677
1678
        for child in content.get_valid_children():
1679
            self.mark_unread(child, do_flush=False)
1680
1681
        if do_flush:
1682
            self.flush()
1683
1684
        return content
1685
1686
    def flush(self):
1687
        self._session.flush()
1688
1689
    def save(self, content: Content, action_description: str=None, do_flush=True, do_notify=True):
1690
        """
1691
        Save an object, flush the session and set the revision_type property
1692
        :param content:
1693
        :param action_description:
1694
        :return:
1695
        """
1696
        assert action_description is None or action_description in ActionDescription.allowed_values()
1697
1698
        if not action_description:
1699
            # See if the last action has been modified
1700
            if content.revision_type==None or len(get_history(content.revision, 'revision_type'))<=0:
1701
                # The action has not been modified, so we set it to default edition
1702
                action_description = ActionDescription.EDITION
1703
1704
        if action_description:
1705
            content.revision_type = action_description
1706
1707
        if do_flush:
1708
            # INFO - 2015-09-03 - D.A.
1709
            # There are 2 flush because of the use
1710
            # of triggers for content creation
1711
            #
1712
            # (when creating a content, actually this is an insert of a new
1713
            # revision in content_revisions ; so the mark_read operation need
1714
            # to get full real data from database before to be prepared.
1715
1716
            self._session.add(content)
1717
            self._session.flush()
1718
1719
            # TODO - 2015-09-03 - D.A. - Do not use triggers
1720
            # We should create a new ContentRevisionRO object instead of Content
1721
            # This would help managing view/not viewed status
1722
            self.mark_read(content, do_flush=True)
1723
1724
        if do_notify:
1725
            self.do_notify(content)
1726
1727
    def do_notify(self, content: Content):
1728
        """
1729
        Allow to force notification for a given content. By default, it is
1730
        called during the .save() operation
1731
        :param content:
1732
        :return:
1733
        """
1734
        NotifierFactory.create(
1735
            config=self._config,
1736
            current_user=self._user,
1737
            session=self._session,
1738
        ).notify_content_update(content)
1739
1740
    def get_keywords(self, search_string, search_string_separators=None) -> [str]:
1741
        """
1742
        :param search_string: a list of coma-separated keywords
1743
        :return: a list of str (each keyword = 1 entry
1744
        """
1745
1746
        search_string_separators = search_string_separators or ContentApi.SEARCH_SEPARATORS
1747
1748
        keywords = []
1749
        if search_string:
1750
            keywords = [keyword.strip() for keyword in re.split(search_string_separators, search_string)]
1751
1752
        return keywords
1753
1754
    def search(self, keywords: [str]) -> Query:
1755
        """
1756
        :return: a sorted list of Content items
1757
        """
1758
1759
        if len(keywords)<=0:
1760
            return None
1761
1762
        filter_group_label = list(Content.label.ilike('%{}%'.format(keyword)) for keyword in keywords)
1763
        filter_group_desc = list(Content.description.ilike('%{}%'.format(keyword)) for keyword in keywords)
1764
        title_keyworded_items = self._hard_filtered_base_query().\
1765
            filter(or_(*(filter_group_label+filter_group_desc))).\
1766
            options(joinedload('children_revisions')).\
1767
            options(joinedload('parent'))
1768
1769
        return title_keyworded_items
1770
1771
    def get_all_types(self) -> typing.List[ContentType]:
1772
        labels = content_type_list.endpoint_allowed_types_slug()
1773
        content_types = []
1774
        for label in labels:
1775
            content_types.append(content_type_list.get_one_by_slug(label))
1776
1777
        return content_types
1778
1779
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1780
    def exclude_unavailable(
1781
        self,
1782
        contents: typing.List[Content],
1783
    ) -> typing.List[Content]:
1784
        """
1785
        Update and return list with content under archived/deleted removed.
1786
        :param contents: List of contents to parse
1787
        """
1788
        for content in contents[:]:
1789
            if self.content_under_deleted(content) or self.content_under_archived(content):
1790
                contents.remove(content)
1791
        return contents
1792
1793
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1794
    def content_under_deleted(self, content: Content) -> bool:
1795
        if content.parent:
1796
            if content.parent.is_deleted:
1797
                return True
1798
            if content.parent.parent:
1799
                return self.content_under_deleted(content.parent)
1800
        return False
1801
1802
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1803
    def content_under_archived(self, content: Content) -> bool:
1804
        if content.parent:
1805
            if content.parent.is_archived:
1806
                return True
1807
            if content.parent.parent:
1808
                return self.content_under_archived(content.parent)
1809
        return False
1810
1811
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1812
    def find_one_by_unique_property(
1813
            self,
1814
            property_name: str,
1815
            property_value: str,
1816
            workspace: Workspace=None,
1817
    ) -> Content:
1818
        """
1819
        Return Content who contains given property.
1820
        Raise sqlalchemy.orm.exc.MultipleResultsFound if more than one Content
1821
        contains this property value.
1822
        :param property_name: Name of property
1823
        :param property_value: Value of property
1824
        :param workspace: Workspace who contains Content
1825
        :return: Found Content
1826
        """
1827
        # TODO - 20160602 - Bastien: Should be JSON type query
1828
        # see https://www.compose.io/articles/using-json-extensions-in-\
1829
        # postgresql-from-python-2/
1830
        query = self._base_query(workspace=workspace).filter(
1831
            Content._properties.like(
1832
                '%"{property_name}": "{property_value}"%'.format(
1833
                    property_name=property_name,
1834
                    property_value=property_value,
1835
                )
1836
            )
1837
        )
1838
        return query.one()
1839
1840
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1841
    def generate_folder_label(
1842
            self,
1843
            workspace: Workspace,
1844
            parent: Content=None,
1845
    ) -> str:
1846
        """
1847
        Generate a folder label
1848
        :param workspace: Future folder workspace
1849
        :param parent: Parent of foture folder (can be None)
1850
        :return: Generated folder name
1851
        """
1852
        _ = self.translator.get_translation
1853
        query = self._base_query(workspace=workspace)\
1854
            .filter(Content.label.ilike('{0}%'.format(
1855
                _('New folder'),
1856
            )))
1857
        if parent:
1858
            query = query.filter(Content.parent == parent)
1859
1860
        return _('New folder {0}').format(
1861
            query.count() + 1,
1862
        )
1863