Passed
Pull Request — develop (#87)
by inkhey
01:44
created

ContentApi.show()   A

Complexity

Conditions 1

Size

Total Lines 27
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 17
dl 0
loc 27
rs 9.55
c 0
b 0
f 0
cc 1
nop 4
1
# -*- coding: utf-8 -*-
2
import datetime
3
import os
4
import re
5
import typing
6
from contextlib import contextmanager
7
8
import sqlalchemy
9
import transaction
10
from depot.io.utils import FileIntent
11
from depot.manager import DepotManager
12
from preview_generator.exception import UnsupportedMimeType
13
from preview_generator.manager import PreviewManager
14
from sqlalchemy import desc
15
from sqlalchemy import func
16
from sqlalchemy import or_
17
from sqlalchemy.orm import Query
18
from sqlalchemy.orm import aliased
19
from sqlalchemy.orm import joinedload
20
from sqlalchemy.orm.attributes import QueryableAttribute
21
from sqlalchemy.orm.attributes import get_history
22
from sqlalchemy.orm.exc import NoResultFound
23
from sqlalchemy.orm.session import Session
24
from sqlalchemy.sql.elements import and_
25
26
from tracim_backend.app_models.contents import CONTENT_STATUS
27
from tracim_backend.app_models.contents import CONTENT_TYPES
28
from tracim_backend.app_models.contents import FOLDER_TYPE
29
from tracim_backend.app_models.contents import ContentStatus
30
from tracim_backend.app_models.contents import ContentType
31
from tracim_backend.app_models.contents import GlobalStatus
32
from tracim_backend.exceptions import ContentClosed
33
from tracim_backend.exceptions import ContentLabelAlreadyUsedHere
34
from tracim_backend.exceptions import ContentNotFound
35
from tracim_backend.exceptions import ContentTypeNotExist
36
from tracim_backend.exceptions import EmptyCommentContentNotAllowed
37
from tracim_backend.exceptions import EmptyLabelNotAllowed
38
from tracim_backend.exceptions import PageOfPreviewNotFound
39
from tracim_backend.exceptions import PreviewDimNotAllowed
40
from tracim_backend.exceptions import RevisionDoesNotMatchThisContent
41
from tracim_backend.exceptions import SameValueError
42
from tracim_backend.exceptions import UnallowedSubContent
43
from tracim_backend.exceptions import UnavailablePreview
44
from tracim_backend.exceptions import WorkspacesDoNotMatch
45
from tracim_backend.lib.core.notifications import NotifierFactory
46
from tracim_backend.lib.utils.logger import logger
47
from tracim_backend.lib.utils.translation import DEFAULT_FALLBACK_LANG
48
from tracim_backend.lib.utils.translation import Translator
49
from tracim_backend.lib.utils.utils import cmp_to_key
50
from tracim_backend.lib.utils.utils import current_date_for_filename
51
from tracim_backend.lib.utils.utils import preview_manager_page_format
52
from tracim_backend.models.auth import User
53
from tracim_backend.models.context_models import ContentInContext
54
from tracim_backend.models.context_models import PreviewAllowedDim
55
from tracim_backend.models.context_models import RevisionInContext
56
from tracim_backend.models.data import ActionDescription
57
from tracim_backend.models.data import Content
58
from tracim_backend.models.data import ContentRevisionRO
59
from tracim_backend.models.data import NodeTreeItem
60
from tracim_backend.models.data import RevisionReadStatus
61
from tracim_backend.models.data import UserRoleInWorkspace
62
from tracim_backend.models.data import Workspace
63
from tracim_backend.models.revision_protection import new_revision
64
65
__author__ = 'damien'
66
67
68
# TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
69
def compare_content_for_sorting_by_type_and_name(
70
        content1: Content,
71
        content2: Content
72
) -> int:
73
    """
74
    :param content1:
75
    :param content2:
76
    :return:    1 if content1 > content2
77
                -1 if content1 < content2
78
                0 if content1 = content2
79
    """
80
81
    if content1.type == content2.type:
82
        if content1.get_label().lower()>content2.get_label().lower():
83
            return 1
84
        elif content1.get_label().lower()<content2.get_label().lower():
85
            return -1
86
        return 0
87
    else:
88
        # TODO - D.A. - 2014-12-02 - Manage Content Types Dynamically
89
        content_type_order = [
90
            CONTENT_TYPES.Folder.slug,
91
            CONTENT_TYPES.Page.slug,
92
            CONTENT_TYPES.Thread.slug,
93
            CONTENT_TYPES.File.slug,
94
        ]
95
96
        content_1_type_index = content_type_order.index(content1.type)
97
        content_2_type_index = content_type_order.index(content2.type)
98
        result = content_1_type_index - content_2_type_index
99
100
        if result < 0:
101
            return -1
102
        elif result > 0:
103
            return 1
104
        else:
105
            return 0
106
107
# TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
108
def compare_tree_items_for_sorting_by_type_and_name(
109
        item1: NodeTreeItem,
110
        item2: NodeTreeItem
111
) -> int:
112
    return compare_content_for_sorting_by_type_and_name(item1.node, item2.node)
113
114
115
class ContentApi(object):
116
117
    SEARCH_SEPARATORS = ',| '
118
    SEARCH_DEFAULT_RESULT_NB = 50
119
120
    # DISPLAYABLE_CONTENTS = (
121
    #     CONTENT_TYPES.Folder.slug,
122
    #     CONTENT_TYPES.File.slug,
123
    #     CONTENT_TYPES.Comment.slug,
124
    #     CONTENT_TYPES.Thread.slug,
125
    #     CONTENT_TYPES.Page.slug,
126
    #     CONTENT_TYPES.Page.slugLegacy,
127
    #     ContentType.MarkdownPage,
128
    # )
129
130
    def __init__(
131
            self,
132
            session: Session,
133
            current_user: typing.Optional[User],
134
            config,
135
            show_archived: bool = False,
136
            show_deleted: bool = False,
137
            show_temporary: bool = False,
138
            show_active: bool = True,
139
            all_content_in_treeview: bool = True,
140
            force_show_all_types: bool = False,
141
            disable_user_workspaces_filter: bool = False,
142
    ) -> None:
143
        self._session = session
144
        self._user = current_user
145
        self._config = config
146
        self._user_id = current_user.user_id if current_user else None
147
        self._show_archived = show_archived
148
        self._show_deleted = show_deleted
149
        self._show_temporary = show_temporary
150
        self._show_active = show_active
151
        self._show_all_type_of_contents_in_treeview = all_content_in_treeview
152
        self._force_show_all_types = force_show_all_types
153
        self._disable_user_workspaces_filter = disable_user_workspaces_filter
154
        self.preview_manager = PreviewManager(self._config.PREVIEW_CACHE_DIR, create_folder=True)  # nopep8
155
        default_lang = None
156
        if self._user:
157
            default_lang = self._user.lang
158
        if not default_lang:
159
            default_lang = DEFAULT_FALLBACK_LANG
160
        self.translator = Translator(app_config=self._config, default_lang=default_lang)  # nopep8
161
162
    @contextmanager
163
    def show(
164
            self,
165
            show_archived: bool=False,
166
            show_deleted: bool=False,
167
            show_temporary: bool=False,
168
    ) -> typing.Generator['ContentApi', None, None]:
169
        """
170
        Use this method as context manager to update show_archived,
171
        show_deleted and show_temporary properties during context.
172
        :param show_archived: show archived contents
173
        :param show_deleted:  show deleted contents
174
        :param show_temporary:  show temporary contents
175
        """
176
        previous_show_archived = self._show_archived
177
        previous_show_deleted = self._show_deleted
178
        previous_show_temporary = self._show_temporary
179
180
        try:
181
            self._show_archived = show_archived
182
            self._show_deleted = show_deleted
183
            self._show_temporary = show_temporary
184
            yield self
185
        finally:
186
            self._show_archived = previous_show_archived
187
            self._show_deleted = previous_show_deleted
188
            self._show_temporary = previous_show_temporary
189
190
    def get_content_in_context(self, content: Content) -> ContentInContext:
191
        return ContentInContext(content, self._session, self._config, self._user)  # nopep8
192
193
    def get_revision_in_context(self, revision: ContentRevisionRO) -> RevisionInContext:  # nopep8
194
        # TODO - G.M - 2018-06-173 - create revision in context object
195
        return RevisionInContext(revision, self._session, self._config, self._user) # nopep8
196
    
197
    def _get_revision_join(self) -> sqlalchemy.sql.elements.BooleanClauseList:
198
        """
199
        Return the Content/ContentRevision query join condition
200
        :return: Content/ContentRevision query join condition
201
        """
202
        return and_(Content.id == ContentRevisionRO.content_id,
203
                    ContentRevisionRO.revision_id == self._session.query(
204
                        ContentRevisionRO.revision_id)
205
                    .filter(ContentRevisionRO.content_id == Content.id)
206
                    .order_by(ContentRevisionRO.revision_id.desc())
207
                    .limit(1)
208
                    .correlate(Content))
209
210
    def get_canonical_query(self) -> Query:
211
        """
212
        Return the Content/ContentRevision base query who join these table on the last revision.
213
        :return: Content/ContentRevision Query
214
        """
215
        return self._session.query(Content)\
216
            .join(ContentRevisionRO, self._get_revision_join())
217
218
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
219
    @classmethod
220
    def sort_tree_items(
221
        cls,
222
        content_list: typing.List[NodeTreeItem],
223
    )-> typing.List[NodeTreeItem]:
224
        news = []
225
        for item in content_list:
226
            news.append(item)
227
228
        content_list.sort(key=cmp_to_key(
229
            compare_tree_items_for_sorting_by_type_and_name,
230
        ))
231
232
        return content_list
233
234
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
235
    @classmethod
236
    def sort_content(
237
        cls,
238
        content_list: typing.List[Content],
239
    ) -> typing.List[Content]:
240
        content_list.sort(key=cmp_to_key(compare_content_for_sorting_by_type_and_name))
241
        return content_list
242
243
    def __real_base_query(
244
        self,
245
        workspace: Workspace = None,
246
    ) -> Query:
247
        result = self.get_canonical_query()
248
249
        # Exclude non displayable types
250
        if not self._force_show_all_types:
251
            result = result.filter(Content.type.in_(CONTENT_TYPES.query_allowed_types_slugs()))
252
253
        if workspace:
254
            result = result.filter(Content.workspace_id == workspace.workspace_id)
255
256
        # Security layer: if user provided, filter
257
        # with user workspaces privileges
258
        if self._user and not self._disable_user_workspaces_filter:
259
            user = self._session.query(User).get(self._user_id)
260
            # Filter according to user workspaces
261
            workspace_ids = [r.workspace_id for r in user.roles \
262
                             if r.role>=UserRoleInWorkspace.READER]
263
            result = result.filter(or_(
264
                Content.workspace_id.in_(workspace_ids),
265
                # And allow access to non workspace document when he is owner
266
                and_(
267
                    Content.workspace_id == None,
268
                    Content.owner_id == self._user_id,
269
                )
270
            ))
271
272
        return result
273
274
    def _base_query(self, workspace: Workspace=None) -> Query:
275
        result = self.__real_base_query(workspace)
276
277
        if not self._show_active:
278
            result = result.filter(or_(
279
                Content.is_deleted==True,
280
                Content.is_archived==True,
281
            ))
282
        if not self._show_deleted:
283
            result = result.filter(Content.is_deleted==False)
284
285
        if not self._show_archived:
286
            result = result.filter(Content.is_archived==False)
287
288
        if not self._show_temporary:
289
            result = result.filter(Content.is_temporary==False)
290
291
        return result
292
293
    def __revisions_real_base_query(
294
        self,
295
        workspace: Workspace=None,
296
    ) -> Query:
297
        result = self._session.query(ContentRevisionRO)
298
299
        # Exclude non displayable types
300
        if not self._force_show_all_types:
301
            result = result.filter(Content.type.in_(self.DISPLAYABLE_CONTENTS))
302
303
        if workspace:
304
            result = result.filter(ContentRevisionRO.workspace_id==workspace.workspace_id)
305
306
        if self._user:
307
            user = self._session.query(User).get(self._user_id)
308
            # Filter according to user workspaces
309
            workspace_ids = [r.workspace_id for r in user.roles \
310
                             if r.role>=UserRoleInWorkspace.READER]
311
            result = result.filter(ContentRevisionRO.workspace_id.in_(workspace_ids))
312
313
        return result
314
315
    def _revisions_base_query(
316
        self,
317
        workspace: Workspace=None,
318
    ) -> Query:
319
        result = self.__revisions_real_base_query(workspace)
320
321
        if not self._show_deleted:
322
            result = result.filter(ContentRevisionRO.is_deleted==False)
323
324
        if not self._show_archived:
325
            result = result.filter(ContentRevisionRO.is_archived==False)
326
327
        if not self._show_temporary:
328
            result = result.filter(Content.is_temporary==False)
329
330
        return result
331
332
    def _hard_filtered_base_query(
333
        self,
334
        workspace: Workspace=None,
335
    ) -> Query:
336
        """
337
        If set to True, then filterign on is_deleted and is_archived will also
338
        filter parent properties. This is required for search() function which
339
        also search in comments (for example) which may be 'not deleted' while
340
        the associated content is deleted
341
342
        :param hard_filtering:
343
        :return:
344
        """
345
        result = self.__real_base_query(workspace)
346
347
        if not self._show_deleted:
348
            parent = aliased(Content)
349
            result = result.join(parent, Content.parent).\
350
                filter(Content.is_deleted==False).\
351
                filter(parent.is_deleted==False)
352
353
        if not self._show_archived:
354
            parent = aliased(Content)
355
            result = result.join(parent, Content.parent).\
356
                filter(Content.is_archived==False).\
357
                filter(parent.is_archived==False)
358
359
        if not self._show_temporary:
360
            parent = aliased(Content)
361
            result = result.join(parent, Content.parent). \
362
                filter(Content.is_temporary == False). \
363
                filter(parent.is_temporary == False)
364
365
        return result
366
367
    def get_base_query(
368
        self,
369
        workspace: Workspace,
370
    ) -> Query:
371
        return self._base_query(workspace)
372
373
    # TODO - G.M - 2018-07-17 - [Cleanup] Drop this method if unneeded
374
    # def get_child_folders(self, parent: Content=None, workspace: Workspace=None, filter_by_allowed_content_types: list=[], removed_item_ids: list=[], allowed_node_types=None) -> typing.List[Content]:
375
    #     """
376
    #     This method returns child items (folders or items) for left bar treeview.
377
    # 
378
    #     :param parent:
379
    #     :param workspace:
380
    #     :param filter_by_allowed_content_types:
381
    #     :param removed_item_ids:
382
    #     :param allowed_node_types: This parameter allow to hide folders for which the given type of content is not allowed.
383
    #            For example, if you want to move a Page from a folder to another, you should show only folders that accept pages
384
    #     :return:
385
    #     """
386
    #     filter_by_allowed_content_types = filter_by_allowed_content_types or []  # FDV
387
    #     removed_item_ids = removed_item_ids or []  # FDV
388
    # 
389
    #     if not allowed_node_types:
390
    #         allowed_node_types = [CONTENT_TYPES.Folder.slug]
391
    #     elif allowed_node_types==CONTENT_TYPES.Any_SLUG:
392
    #         allowed_node_types = ContentType.all()
393
    # 
394
    #     parent_id = parent.content_id if parent else None
395
    #     folders = self._base_query(workspace).\
396
    #         filter(Content.parent_id==parent_id).\
397
    #         filter(Content.type.in_(allowed_node_types)).\
398
    #         filter(Content.content_id.notin_(removed_item_ids)).\
399
    #         all()
400
    # 
401
    #     if not filter_by_allowed_content_types or \
402
    #                     len(filter_by_allowed_content_types)<=0:
403
    #         # Standard case for the left treeview: we want to show all contents
404
    #         # in the left treeview... so we still filter because for example
405
    #         # comments must not appear in the treeview
406
    #         return [folder for folder in folders \
407
    #                 if folder.type in ContentType.allowed_types_for_folding()]
408
    # 
409
    #     # Now this is a case of Folders only (used for moving content)
410
    #     # When moving a content, you must get only folders that allow to be filled
411
    #     # with the type of content you want to move
412
    #     result = []
413
    #     for folder in folders:
414
    #         for allowed_content_type in filter_by_allowed_content_types:
415
    # 
416
    #             is_folder = folder.type == CONTENT_TYPES.Folder.slug
417
    #             content_type__allowed = folder.properties['allowed_content'][allowed_content_type] == True
418
    # 
419
    #             if is_folder and content_type__allowed:
420
    #                 result.append(folder)
421
    #                 break
422
    # 
423
    #     return result
424
425
    def _is_content_label_free(
426
            self,
427
            label: str,
428
            workspace: Workspace,
429
            parent: Content = None,
430
            exclude_content_id: int = None,
431
    ) -> bool:
432
        """
433
        Check if content label is free
434
        :param label: content label
435
        :param workspace: workspace of the content
436
        :param parent: parent of the content
437
        :param exclude_content_id: exclude a specific content_id (useful
438
        to verify  other content for content update)
439
        :return: True if content label is available
440
        """
441
        # INFO - G.M - 2018-09-04 - Method should not be used by special content
442
        # with empty label like comment.
443
        assert label
444
        assert workspace
445
        query = self.get_base_query(workspace)
446
447
        if parent:
448
            query = query.filter(Content.parent_id == parent.content_id)
449
        else:
450
            query = query.filter(Content.parent_id == None)
451
452
        if exclude_content_id:
453
            query = query.filter(Content.content_id != exclude_content_id)
454
        query = query.filter(Content.workspace_id == workspace.workspace_id)
455
456
        nb_content_with_the_label = query.filter(Content.label == label).count()
457
        if nb_content_with_the_label == 0:
458
            return True
459
        elif nb_content_with_the_label == 1:
460
            return False
461
        else:
462
            critical_error_text = 'Something is wrong in the database ! '\
463
                                  'Content label should be unique ' \
464
                                  'in a same folder in database' \
465
                                  'but you have {nb} content with ' \
466
                                  'label {label} in workspace {workspace_id}'
467
468
            critical_error_text = critical_error_text.format(
469
                nb=nb_content_with_the_label,
470
                label=label,
471
                workspace_id=workspace.workspace_id,
472
            )
473
            if parent:
474
                critical_error_text = '{text} and parent as content {parent_id}'.format(  # nopep8
475
                    text=critical_error_text,
476
                    parent_id=parent.parent_id
477
                )
478
            logger.critical(self, critical_error_text)
479
            return False
480
481
    def _is_content_label_free_or_raise(
482
        self,
483
        label: str,
484
        workspace: Workspace,
485
        parent: Content = None,
486
        exclude_content_id: int = None,
487
    ) -> None:
488
        """
489
        Same as _is_content_label_free but raise exception instead of
490
        returning boolean if content label is already used
491
        """
492
        if not self._is_content_label_free(
493
            label,
494
            workspace,
495
            parent,
496
            exclude_content_id
497
        ):
498
            text = 'A Content already exist with the same label {label} ' \
499
                   ' in workspace {workspace_id}'.format(
500
                      label=label,
501
                      workspace_id=workspace.workspace_id,
502
                   )
503
            if parent:
504
                text = '{text} and parent as content {parent_id}'.format(
505
                    text=text,
506
                    parent_id=parent.parent_id
507
                )
508
            raise ContentLabelAlreadyUsedHere(text)
509
510
    def create(self, content_type_slug: str, workspace: Workspace, parent: Content=None, label: str = '', filename: str = '', do_save=False, is_temporary: bool=False, do_notify=True) -> Content:
511
        # TODO - G.M - 2018-07-16 - raise Exception instead of assert
512
        assert content_type_slug != CONTENT_TYPES.Any_SLUG
513
        assert not (label and filename)
514
515
        if content_type_slug == FOLDER_TYPE and not label:
516
            label = self.generate_folder_label(workspace, parent)
517
518
        # TODO BS 2018-08-13: Despite that workspace is required, create_comment
519
        # can call here with None. Must update create_comment tu require the
520
        # workspace.
521
        if not workspace:
522
            workspace = parent.workspace
523
524
        content_type = CONTENT_TYPES.get_one_by_slug(content_type_slug)
525
        if parent and parent.properties and 'allowed_content' in parent.properties:
526
            if content_type.slug not in parent.properties['allowed_content'] or not parent.properties['allowed_content'][content_type.slug]:
527
                raise UnallowedSubContent(' SubContent of type {subcontent_type}  not allowed in content {content_id}'.format(  # nopep8
528
                    subcontent_type=content_type.slug,
529
                    content_id=parent.content_id,
530
                ))
531
        if not workspace and parent:
532
            workspace = parent.workspace
533
534
        if workspace:
535
            if content_type.slug not in workspace.get_allowed_content_types():
536
                raise UnallowedSubContent(
537
                    ' SubContent of type {subcontent_type}  not allowed in workspace {content_id}'.format(  # nopep8
538
                        subcontent_type=content_type.slug,
539
                        content_id=workspace.workspace_id,
540
                    )
541
                )
542
        if filename:
543
            label = os.path.splitext(filename)[0]
544
        if label:
545
            self._is_content_label_free_or_raise(
546
                label,
547
                workspace,
548
                parent,
549
            )
550
551
        content = Content()
552
        if filename:
553
            # INFO - G.M - 2018-07-04 - File_name setting automatically
554
            # set label and file_extension
555
            content.file_name = label
556
        elif label:
557
            content.label = label
558
        else:
559
            if content_type_slug == CONTENT_TYPES.Comment.slug:
560
                # INFO - G.M - 2018-07-16 - Default label for comments is
561
                # empty string.
562
                content.label = ''
563
            else:
564
                raise EmptyLabelNotAllowed('Content of this type should have a valid label')  # nopep8
565
566
        content.owner = self._user
567
        content.parent = parent
568
569
        content.workspace = workspace
570
        content.type = content_type.slug
571
        content.is_temporary = is_temporary
572
        content.revision_type = ActionDescription.CREATION
573
574
        if content.type in (
575
                CONTENT_TYPES.Page.slug,
576
                CONTENT_TYPES.Thread.slug,
577
        ):
578
            content.file_extension = '.html'
579
580
        if do_save:
581
            self._session.add(content)
582
            self.save(content, ActionDescription.CREATION, do_notify=do_notify)
583
        return content
584
585
    def create_comment(self, workspace: Workspace=None, parent: Content=None, content:str ='', do_save=False) -> Content:
586
        # TODO: check parent allowed_type and workspace allowed_ type
587
        assert parent and parent.type != FOLDER_TYPE
588
        if not content:
589
            raise EmptyCommentContentNotAllowed()
590
591
        item = self.create(
592
            content_type_slug=CONTENT_TYPES.Comment.slug,
593
            workspace=workspace,
594
            parent=parent,
595
            do_notify=False,
596
            do_save=False,
597
            label='',
598
        )
599
        item.description = content
600
        item.revision_type = ActionDescription.COMMENT
601
602
        if do_save:
603
            self.save(item, ActionDescription.COMMENT)
604
        return item
605
606
    def get_one_from_revision(self, content_id: int, content_type: str, workspace: Workspace=None, revision_id=None) -> Content:
607
        """
608
        This method is a hack to convert a node revision item into a node
609
        :param content_id:
610
        :param content_type:
611
        :param workspace:
612
        :param revision_id:
613
        :return:
614
        """
615
616
        content = self.get_one(content_id, content_type, workspace)
617
        revision = self._session.query(ContentRevisionRO).filter(ContentRevisionRO.revision_id==revision_id).one()
618
619
        if revision.content_id==content.content_id:
620
            content.revision_to_serialize = revision.revision_id
621
        else:
622
            raise ValueError('Revision not found for given content')
623
624
        return content
625
626
    def get_one(self, content_id: int, content_type: str, workspace: Workspace=None, parent: Content=None) -> Content:
627
628
        if not content_id:
629
            return None
630
631
        base_request = self._base_query(workspace).filter(Content.content_id==content_id)
632
633
        if content_type!=CONTENT_TYPES.Any_SLUG:
634
            base_request = base_request.filter(Content.type==content_type)
635
636
        if parent:
637
            base_request = base_request.filter(Content.parent_id==parent.content_id)  # nopep8
638
639
        try:
640
            content = base_request.one()
641
        except NoResultFound as exc:
642
            # TODO - G.M - 2018-07-16 - Add better support for all different
643
            # error case who can happened here
644
            # like content doesn't exist, wrong parent, wrong content_type, wrong workspace,
645
            # wrong access to this workspace, wrong base filter according
646
            # to content_status.
647
            raise ContentNotFound('Content "{}" not found in database'.format(content_id)) from exc  # nopep8
648
        return content
649
650
    def get_one_revision(self, revision_id: int = None, content: Content= None) -> ContentRevisionRO:  # nopep8
651
        """
652
        This method allow us to get directly any revision with its id
653
        :param revision_id: The content's revision's id that we want to return
654
        :param content: The content related to the revision, if None do not
655
        check if revision is related to this content.
656
        :return: An item Content linked with the correct revision
657
        """
658
        assert revision_id is not None# DYN_REMOVE
659
660
        revision = self._session.query(ContentRevisionRO).filter(ContentRevisionRO.revision_id == revision_id).one()
661
        if content and revision.content_id != content.content_id:
662
            raise RevisionDoesNotMatchThisContent(
663
                'revision {revision_id} is not a revision of content {content_id}'.format(  # nopep8
664
                    revision_id=revision.revision_id,
665
                    content_id=content.content_id,
666
                    )
667
            )
668
        return revision
669
670
    # INFO - A.P - 2017-07-03 - python file object getter
671
    # in case of we cook a version of preview manager that allows a pythonic
672
    # access to files
673
    # def get_one_revision_file(self, revision_id: int = None):
674
    #     """
675
    #     This function allows us to directly get a Python file object from its
676
    #     revision identifier.
677
    #     :param revision_id: The revision id of the file we want to return
678
    #     :return: The corresponding Python file object
679
    #     """
680
    #     revision = self.get_one_revision(revision_id)
681
    #     return DepotManager.get().get(revision.depot_file)
682
683
    def get_one_revision_filepath(self, revision_id: int = None) -> str:
684
        """
685
        This method allows us to directly get a file path from its revision
686
        identifier.
687
        :param revision_id: The revision id of the filepath we want to return
688
        :return: The corresponding filepath
689
        """
690
        revision = self.get_one_revision(revision_id)
691
        depot = DepotManager.get()
692
        depot_stored_file = depot.get(revision.depot_file)  # type: StoredFile
693
        depot_file_path = depot_stored_file._file_path  # type: str
694
        return depot_file_path
695
696
    # TODO - G.M - 2018-09-04 - [Cleanup] Is this method already needed ?
697
    def get_one_by_label_and_parent(
698
            self,
699
            content_label: str,
700
            content_parent: Content=None,
701
    ) -> Content:
702
        """
703
        This method let us request the database to obtain a Content with its name and parent
704
        :param content_label: Either the content's label or the content's filename if the label is None
705
        :param content_parent: The parent's content
706
        :param workspace: The workspace's content
707
        :return The corresponding Content
708
        """
709
        workspace = content_parent.workspace if content_parent else None
710
        query = self._base_query(workspace)
711
        parent_id = content_parent.content_id if content_parent else None
712
        query = query.filter(Content.parent_id == parent_id)
713
714
        file_name, file_extension = os.path.splitext(content_label)
715
716
        # TODO - G.M - 2018-09-04 - If this method is needed, it should be
717
        # rewritten in order to avoid content_type hardcoded code there
718
        return query.filter(
719
            or_(
720
                and_(
721
                    Content.type == CONTENT_TYPES.File.slug,
722
                    Content.label == file_name,
723
                    Content.file_extension == file_extension,
724
                ),
725
                and_(
726
                    Content.type == CONTENT_TYPES.Thread.slug,
727
                    Content.label == file_name,
728
                ),
729
                and_(
730
                    Content.type == CONTENT_TYPES.Page.slug,
731
                    Content.label == file_name,
732
                ),
733
                and_(
734
                    Content.type == CONTENT_TYPES.Folder.slug,
735
                    Content.label == content_label,
736
                ),
737
            )
738
        ).one()
739
740
    # TODO - G.M - 2018-09-04 - [Cleanup] Is this method already needed ?
741
    def get_one_by_label_and_parent_labels(
742
            self,
743
            content_label: str,
744
            workspace: Workspace,
745
            content_parent_labels: [str]=None,
746
    ):
747
        """
748
        Return content with it's label, workspace and parents labels (optional)
749
        :param content_label: label of content (label or file_name)
750
        :param workspace: workspace containing all of this
751
        :param content_parent_labels: Ordered list of labels representing path
752
            of folder (without workspace label).
753
        E.g.: ['foo', 'bar'] for complete path /Workspace1/foo/bar folder
754
        :return: Found Content
755
        """
756
        query = self._base_query(workspace)
757
        parent_folder = None
758
759
        # Grab content parent folder if parent path given
760
        if content_parent_labels:
761
            parent_folder = self.get_folder_with_workspace_path_labels(
762
                content_parent_labels,
763
                workspace,
764
            )
765
766
        # Build query for found content by label
767
        content_query = self.filter_query_for_content_label_as_path(
768
            query=query,
769
            content_label_as_file=content_label,
770
        )
771
772
        # Modify query to apply parent folder filter if any
773
        if parent_folder:
774
            content_query = content_query.filter(
775
                Content.parent_id == parent_folder.content_id,
776
            )
777
        else:
778
            content_query = content_query.filter(
779
                Content.parent_id == None,
780
            )
781
782
        # Filter with workspace
783
        content_query = content_query.filter(
784
            Content.workspace_id == workspace.workspace_id,
785
        )
786
787
        # Return the content
788
        return content_query\
789
            .order_by(
790
                Content.revision_id.desc(),
791
            )\
792
            .one()
793
794
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
795
    def get_folder_with_workspace_path_labels(
796
            self,
797
            path_labels: [str],
798
            workspace: Workspace,
799
    ) -> Content:
800
        """
801
        Return a Content folder for given relative path.
802
        TODO BS 20161124: Not safe if web interface allow folder duplicate names
803
        :param path_labels: List of labels representing path of folder
804
        (without workspace label).
805
        E.g.: ['foo', 'bar'] for complete path /Workspace1/foo/bar folder
806
        :param workspace: workspace of folders
807
        :return: Content folder
808
        """
809
        query = self._base_query(workspace)
810
        folder = None
811
812
        for label in path_labels:
813
            # Filter query on label
814
            folder_query = query \
815
                .filter(
816
                    Content.type == CONTENT_TYPES.Folder.slug,
817
                    Content.label == label,
818
                    Content.workspace_id == workspace.workspace_id,
819
                )
820
821
            # Search into parent folder (if already deep)
822
            if folder:
823
                folder_query = folder_query\
824
                    .filter(
825
                        Content.parent_id == folder.content_id,
826
                    )
827
            else:
828
                folder_query = folder_query \
829
                    .filter(Content.parent_id == None)
830
831
            # Get thirst corresponding folder
832
            folder = folder_query \
833
                .order_by(Content.revision_id.desc()) \
834
                .one()
835
836
        return folder
837
838
    # TODO - G.M - 2018-09-04 - [Cleanup] Is this method already needed ?
839
    def filter_query_for_content_label_as_path(
840
            self,
841
            query: Query,
842
            content_label_as_file: str,
843
            is_case_sensitive: bool = False,
844
    ) -> Query:
845
        """
846
        Apply normalised filters to found Content corresponding as given label.
847
        :param query: query to modify
848
        :param content_label_as_file: label in this
849
        FILE version, use Content.get_label_as_file().
850
        :param is_case_sensitive: Take care about case or not
851
        :return: modified query
852
        """
853
        file_name, file_extension = os.path.splitext(content_label_as_file)
854
855
        label_filter = Content.label == content_label_as_file
856
        file_name_filter = Content.label == file_name
857
        file_extension_filter = Content.file_extension == file_extension
858
859
        if not is_case_sensitive:
860
            label_filter = func.lower(Content.label) == \
861
                           func.lower(content_label_as_file)
862
            file_name_filter = func.lower(Content.label) == \
863
                               func.lower(file_name)
864
            file_extension_filter = func.lower(Content.file_extension) == \
865
                                    func.lower(file_extension)
866
867
        # TODO - G.M - 2018-09-04 - If this method is needed, it should be
868
        # rewritten in order to avoid content_type hardcoded code there
869
        return query.filter(or_(
870
            and_(
871
                Content.type == CONTENT_TYPES.File.slug,
872
                file_name_filter,
873
                file_extension_filter,
874
            ),
875
            and_(
876
                Content.type == CONTENT_TYPES.Thread.slug,
877
                file_name_filter,
878
                file_extension_filter,
879
            ),
880
            and_(
881
                Content.type == CONTENT_TYPES.Page.slug,
882
                file_name_filter,
883
                file_extension_filter,
884
            ),
885
            and_(
886
                Content.type == CONTENT_TYPES.Folder.slug,
887
                label_filter,
888
            ),
889
        ))
890
891
    def get_pdf_preview_path(
892
            self,
893
            content_id: int,
894
            revision_id: int,
895
            page_number: int
896
    ) -> str:
897
        """
898
        Get pdf preview of revision of content
899
        :param content_id: id of content
900
        :param revision_id: id of content revision
901
        :param page_number: page number of the preview, useful for multipage content
902
        :return: preview_path as string
903
        """
904
        file_path = self.get_one_revision_filepath(revision_id)
905
        try:
906
            page_number = preview_manager_page_format(page_number)
907
            if page_number >= self.preview_manager.get_page_nb(file_path):
908
                raise PageOfPreviewNotFound(
909
                    'page_number {page_number} of content {content_id} does not exist'.format(
910
                        page_number=page_number,
911
                        content_id=content_id
912
                    ),
913
                )
914
            jpg_preview_path = self.preview_manager.get_pdf_preview(
915
                file_path,
916
                page=page_number
917
            )
918
        except UnsupportedMimeType as exc:
919
            raise UnavailablePreview(
920
                'No preview available for content {}, revision {}'.format(content_id, revision_id) # nopep8
921
            ) from exc
922
        return jpg_preview_path
923
924
    def get_full_pdf_preview_path(self, revision_id: int) -> str:
925
        """
926
        Get full(multiple page) pdf preview of revision of content
927
        :param revision_id: id of revision
928
        :return: path of the full pdf preview of this revision
929
        """
930
        file_path = self.get_one_revision_filepath(revision_id)
931
        try:
932
            pdf_preview_path = self.preview_manager.get_pdf_preview(file_path)
933
        except UnsupportedMimeType as exc:
934
            raise UnavailablePreview(
935
                'No preview available for revision {}'.format(revision_id)
936
            ) from exc
937
        return pdf_preview_path
938
939
    def get_jpg_preview_allowed_dim(self) -> PreviewAllowedDim:
940
        """
941
        Get jpg preview allowed dimensions and strict bool param.
942
        """
943
        return PreviewAllowedDim(
944
            self._config.PREVIEW_JPG_RESTRICTED_DIMS,
945
            self._config.PREVIEW_JPG_ALLOWED_DIMS,
946
        )
947
948
    def get_jpg_preview_path(
949
        self,
950
        content_id: int,
951
        revision_id: int,
952
        page_number: int,
953
        width: int = None,
954
        height: int = None,
955
    ) -> str:
956
        """
957
        Get jpg preview of revision of content
958
        :param content_id: id of content
959
        :param revision_id: id of content revision
960
        :param page_number: page number of the preview, useful for multipage content
961
        :param width: width in pixel
962
        :param height: height in pixel
963
        :return: preview_path as string
964
        """
965
        file_path = self.get_one_revision_filepath(revision_id)
966
        try:
967
            page_number = preview_manager_page_format(page_number)
968
            if page_number >= self.preview_manager.get_page_nb(file_path):
969
                raise PageOfPreviewNotFound(
970
                    'page {page_number} of revision {revision_id} of content {content_id} does not exist'.format(  # nopep8
971
                        page_number=page_number,
972
                        revision_id=revision_id,
973
                        content_id=content_id,
974
                    ),
975
                )
976
            if not width and not height:
977
                width = self._config.PREVIEW_JPG_ALLOWED_DIMS[0].width
978
                height = self._config.PREVIEW_JPG_ALLOWED_DIMS[0].height
979
980
            allowed_dim = False
981
            for preview_dim in self._config.PREVIEW_JPG_ALLOWED_DIMS:
982
                if width == preview_dim.width and height == preview_dim.height:
983
                    allowed_dim = True
984
                    break
985
986
            if not allowed_dim and self._config.PREVIEW_JPG_RESTRICTED_DIMS:
987
                raise PreviewDimNotAllowed(
988
                    'Size {width}x{height} is not allowed for jpeg preview'.format(
989
                        width=width,
990
                        height=height,
991
                    )
992
                )
993
            jpg_preview_path = self.preview_manager.get_jpeg_preview(
994
                file_path,
995
                page=page_number,
996
                width=width,
997
                height=height,
998
            )
999
        except UnsupportedMimeType as exc:
1000
            raise UnavailablePreview(
1001
                'No preview available for content {}, revision {}'.format(content_id, revision_id)  # nopep8
1002
            ) from exc
1003
        return jpg_preview_path
1004
1005
    def _get_all_query(
1006
        self,
1007
        parent_id: int = None,
1008
        content_type_slug: str = CONTENT_TYPES.Any_SLUG,
1009
        workspace: Workspace = None,
1010
        label:str = None,
1011
        order_by_properties: typing.Optional[typing.List[typing.Union[str, QueryableAttribute]]] = None,  # nopep8
1012
    ) -> Query:
1013
        """
1014
        Extended filter for better "get all data" query
1015
        :param parent_id: filter by parent_id
1016
        :param content_type_slug: filter by content_type slug
1017
        :param workspace: filter by workspace
1018
        :param order_by_properties: filter by properties can be both string of
1019
        attribute or attribute of Model object from sqlalchemy(preferred way,
1020
        QueryableAttribute object)
1021
        :return: Query object
1022
        """
1023
        order_by_properties = order_by_properties or []  # FDV
1024
        assert parent_id is None or isinstance(parent_id, int)
1025
        assert content_type_slug is not None
1026
        resultset = self._base_query(workspace)
1027
1028
        if content_type_slug != CONTENT_TYPES.Any_SLUG:
1029
            # INFO - G.M - 2018-07-05 - convert with
1030
            #  content type object to support legacy slug
1031
            content_type_object = CONTENT_TYPES.get_one_by_slug(content_type_slug)
1032
            all_slug_alias = [content_type_object.slug]
1033
            if content_type_object.slug_alias:
1034
                all_slug_alias.extend(content_type_object.slug_alias)
1035
            resultset = resultset.filter(Content.type.in_(all_slug_alias))
1036
1037
        if parent_id:
1038
            resultset = resultset.filter(Content.parent_id==parent_id)
1039
        if parent_id == 0 or parent_id is False:
1040
            resultset = resultset.filter(Content.parent_id == None)
1041
        if label:
1042
            resultset = resultset.filter(Content.label.ilike('%{}%'.format(label)))  # nopep8
1043
1044
        for _property in order_by_properties:
1045
            resultset = resultset.order_by(_property)
1046
1047
        return resultset
1048
1049
    def get_all(
1050
            self,
1051
            parent_id: int=None,
1052
            content_type: str=CONTENT_TYPES.Any_SLUG,
1053
            workspace: Workspace=None,
1054
            label: str=None,
1055
            order_by_properties: typing.Optional[typing.List[typing.Union[str, QueryableAttribute]]] = None,  # nopep8
1056
    ) -> typing.List[Content]:
1057
        """
1058
        Return all content using some filters
1059
        :param parent_id: filter by parent_id
1060
        :param content_type: filter by content_type slug
1061
        :param workspace: filter by workspace
1062
        :param order_by_properties: filter by properties can be both string of
1063
        attribute or attribute of Model object from sqlalchemy(preferred way,
1064
        QueryableAttribute object)
1065
        :return: List of contents
1066
        """
1067
        order_by_properties = order_by_properties or []  # FDV
1068
        return self._get_all_query(parent_id, content_type, workspace, label, order_by_properties).all()
1069
1070
    # TODO - G.M - 2018-07-17 - [Cleanup] Drop this method if unneeded
1071
    # def get_children(self, parent_id: int, content_types: list, workspace: Workspace=None) -> typing.List[Content]:
1072
    #     """
1073
    #     Return parent_id childs of given content_types
1074
    #     :param parent_id: parent id
1075
    #     :param content_types: list of types
1076
    #     :param workspace: workspace filter
1077
    #     :return: list of content
1078
    #     """
1079
    #     resultset = self._base_query(workspace)
1080
    #     resultset = resultset.filter(Content.type.in_(content_types))
1081
    #
1082
    #     if parent_id:
1083
    #         resultset = resultset.filter(Content.parent_id==parent_id)
1084
    #     if parent_id is False:
1085
    #         resultset = resultset.filter(Content.parent_id == None)
1086
    #
1087
    #     return resultset.all()
1088
1089
    # TODO - G.M - 2018-07-17 - [Cleanup] Drop this method if unneeded
1090
    # TODO find an other name to filter on is_deleted / is_archived
1091
    def get_all_with_filter(self, parent_id: int=None, content_type: str=CONTENT_TYPES.Any_SLUG, workspace: Workspace=None) -> typing.List[Content]:
1092
        assert parent_id is None or isinstance(parent_id, int) # DYN_REMOVE
1093
        assert content_type is not None# DYN_REMOVE
1094
        assert isinstance(content_type, str) # DYN_REMOVE
1095
1096
        resultset = self._base_query(workspace)
1097
1098
        if content_type != CONTENT_TYPES.Any_SLUG:
1099
            resultset = resultset.filter(Content.type==content_type)
1100
1101
        resultset = resultset.filter(Content.is_deleted == self._show_deleted)
1102
        resultset = resultset.filter(Content.is_archived == self._show_archived)
1103
        resultset = resultset.filter(Content.is_temporary == self._show_temporary)
1104
1105
        resultset = resultset.filter(Content.parent_id==parent_id)
1106
1107
        return resultset.all()
1108
1109
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1110
    def get_all_without_exception(self, content_type: str, workspace: Workspace=None) -> typing.List[Content]:
1111
        assert content_type is not None# DYN_REMOVE
1112
1113
        resultset = self._base_query(workspace)
1114
1115
        if content_type != CONTENT_TYPES.Any_SLUG:
1116
            resultset = resultset.filter(Content.type==content_type)
1117
1118
        return resultset.all()
1119
1120
    def get_last_unread(self, parent_id: int, content_type: str,
1121
                        workspace: Workspace=None, limit=10) -> typing.List[Content]:
1122
        assert parent_id is None or isinstance(parent_id, int) # DYN_REMOVE
1123
        assert content_type is not None# DYN_REMOVE
1124
        assert isinstance(content_type, str) # DYN_REMOVE
1125
1126
        read_revision_ids = self._session.query(RevisionReadStatus.revision_id) \
1127
            .filter(RevisionReadStatus.user_id==self._user_id)
1128
1129
        not_read_revisions = self._revisions_base_query(workspace) \
1130
            .filter(~ContentRevisionRO.revision_id.in_(read_revision_ids)) \
1131
            .filter(ContentRevisionRO.workspace_id == Workspace.workspace_id) \
1132
            .filter(Workspace.is_deleted.is_(False)) \
1133
            .subquery()
1134
1135
    # TODO - G.M - 2018-07-17 - [Cleanup] Drop this method if unneeded
1136
    # def get_all_without_exception(self, content_type: str, workspace: Workspace=None) -> typing.List[Content]:
1137
    #     assert content_type is not None# DYN_REMOVE
1138
    #
1139
    #     resultset = self._base_query(workspace)
1140
    #
1141
    #     if content_type != CONTENT_TYPES.Any_SLUG:
1142
    #         resultset = resultset.filter(Content.type==content_type)
1143
    #
1144
    #     return resultset.all()
1145
1146
    def get_last_active(
1147
            self,
1148
            workspace: Workspace=None,
1149
            limit: typing.Optional[int]=None,
1150
            before_content: typing.Optional[Content]= None,
1151
            content_ids: typing.Optional[typing.List[int]] = None,
1152
    ) -> typing.List[Content]:
1153
        """
1154
        get contents list sorted by last update
1155
        (last modification of content itself or one of this comment)
1156
        :param workspace: Workspace to check
1157
        :param limit: maximum number of elements to return
1158
        :param before_content: last_active content are only those updated
1159
         before this content given.
1160
        :param content_ids: restrict selection to some content ids and
1161
        related Comments
1162
        :return: list of content
1163
        """
1164
1165
        resultset = self._get_all_query(
1166
            workspace=workspace,
1167
        )
1168
        if content_ids:
1169
            resultset = resultset.filter(
1170
                or_(
1171
                    Content.content_id.in_(content_ids),
1172
                    and_(
1173
                        Content.parent_id.in_(content_ids),
1174
                        Content.type == CONTENT_TYPES.Comment.slug
1175
                    )
1176
                )
1177
            )
1178
1179
        resultset = resultset.order_by(desc(Content.updated))
1180
1181
        active_contents = []
1182
        too_recent_content = []
1183
        before_content_find = False
1184
        for content in resultset:
1185
            related_active_content = None
1186
            if CONTENT_TYPES.Comment.slug == content.type:
1187
                related_active_content = content.parent
1188
            else:
1189
                related_active_content = content
1190
1191
            # INFO - G.M - 2018-08-10 - re-apply general filters here to avoid
1192
            # issue with comments
1193
            if not self._show_deleted and related_active_content.is_deleted:
1194
                continue
1195
            if not self._show_archived and related_active_content.is_archived:
1196
                continue
1197
1198
            if related_active_content not in active_contents and related_active_content not in too_recent_content:  # nopep8
1199
1200
                if not before_content or before_content_find:
1201
                    active_contents.append(related_active_content)
1202
                else:
1203
                    too_recent_content.append(related_active_content)
1204
1205
                if before_content and related_active_content == before_content:
1206
                    before_content_find = True
1207
1208
            if limit and len(active_contents) >= limit:
1209
                break
1210
1211
        return active_contents
1212
1213
    # TODO - G.M - 2018-07-19 - Find a way to update this method to something
1214
    # usable and efficient for tracim v2 to get content with read/unread status
1215
    # instead of relying on has_new_information_for()
1216
    # def get_last_unread(self, parent_id: typing.Optional[int], content_type: str,
1217
    #                     workspace: Workspace=None, limit=10) -> typing.List[Content]:
1218
    #     assert parent_id is None or isinstance(parent_id, int) # DYN_REMOVE
1219
    #     assert content_type is not None# DYN_REMOVE
1220
    #     assert isinstance(content_type, str) # DYN_REMOVE
1221
    #
1222
    #     read_revision_ids = self._session.query(RevisionReadStatus.revision_id) \
1223
    #         .filter(RevisionReadStatus.user_id==self._user_id)
1224
    #
1225
    #     not_read_revisions = self._revisions_base_query(workspace) \
1226
    #         .filter(~ContentRevisionRO.revision_id.in_(read_revision_ids)) \
1227
    #         .filter(ContentRevisionRO.workspace_id == Workspace.workspace_id) \
1228
    #         .filter(Workspace.is_deleted.is_(False)) \
1229
    #         .subquery()
1230
    #
1231
    #     not_read_content_ids_query = self._session.query(
1232
    #         distinct(not_read_revisions.c.content_id)
1233
    #     )
1234
    #     not_read_content_ids = list(map(
1235
    #         itemgetter(0),
1236
    #         not_read_content_ids_query,
1237
    #     ))
1238
    #
1239
    #     not_read_contents = self._base_query(workspace) \
1240
    #         .filter(Content.content_id.in_(not_read_content_ids)) \
1241
    #         .order_by(desc(Content.updated))
1242
    #
1243
    #     if content_type != CONTENT_TYPES.Any_SLUG:
1244
    #         not_read_contents = not_read_contents.filter(
1245
    #             Content.type==content_type)
1246
    #     else:
1247
    #         not_read_contents = not_read_contents.filter(
1248
    #             Content.type!=CONTENT_TYPES.Folder.slug)
1249
    #
1250
    #     if parent_id:
1251
    #         not_read_contents = not_read_contents.filter(
1252
    #             Content.parent_id==parent_id)
1253
    #
1254
    #     result = []
1255
    #     for item in not_read_contents:
1256
    #         new_item = None
1257
    #         if CONTENT_TYPES.Comment.slug == item.type:
1258
    #             new_item = item.parent
1259
    #         else:
1260
    #             new_item = item
1261
    #
1262
    #         # INFO - D.A. - 2015-05-20
1263
    #         # We do not want to show only one item if the last 10 items are
1264
    #         # comments about one thread for example
1265
    #         if new_item not in result:
1266
    #             result.append(new_item)
1267
    #
1268
    #         if len(result) >= limit:
1269
    #             break
1270
    #
1271
    #     return result
1272
1273
    def _set_allowed_content(self, content: Content, allowed_content_dict: dict) -> None:  # nopep8
1274
        """
1275
        :param content: the given content instance
1276
        :param allowed_content_dict: must be something like this:
1277
            dict(
1278
                folder = True
1279
                thread = True,
1280
                file = False,
1281
                page = True
1282
            )
1283
        :return: nothing
1284
        """
1285
        properties = content.properties.copy()
1286
        properties['allowed_content'] = allowed_content_dict
1287
        content.properties = properties
1288
1289
    def set_allowed_content(self, content: Content, allowed_content_type_slug_list: typing.List[str]) -> None:  # nopep8
1290
        """
1291
        :param content: the given content instance
1292
        :param allowed_content_type_slug_list: list of content_type_slug to
1293
        accept as subcontent.
1294
        :return: nothing
1295
        """
1296
        allowed_content_dict = {}
1297
        for allowed_content_type_slug in allowed_content_type_slug_list:
1298
            if allowed_content_type_slug not in CONTENT_TYPES.endpoint_allowed_types_slug():
1299
                raise ContentTypeNotExist('Content_type {} does not exist'.format(allowed_content_type_slug))  # nopep8
1300
            allowed_content_dict[allowed_content_type_slug] = True
1301
1302
        self._set_allowed_content(content, allowed_content_dict)
1303
1304
    def restore_content_default_allowed_content(self, content: Content) -> None:
1305
        """
1306
        Return to default allowed_content_types
1307
        :param content: the given content instance
1308
        :return: nothing
1309
        """
1310
        if content._properties and 'allowed_content' in content._properties:
1311
            properties = content.properties.copy()
1312
            del properties['allowed_content']
1313
            content.properties = properties
1314
1315
    def set_status(self, content: Content, new_status: str):
1316
        if new_status in CONTENT_STATUS.get_all_slugs_values():
1317
            content.status = new_status
1318
            content.revision_type = ActionDescription.STATUS_UPDATE
1319
        else:
1320
            raise ValueError('The given value {} is not allowed'.format(new_status))
1321
1322
    def move(self,
1323
             item: Content,
1324
             new_parent: Content,
1325
             must_stay_in_same_workspace: bool=True,
1326
             new_workspace: Workspace=None,
1327
    ):
1328
        if must_stay_in_same_workspace:
1329
            if new_parent and new_parent.workspace_id != item.workspace_id:
1330
                raise ValueError('the item should stay in the same workspace')
1331
1332
        item.parent = new_parent
1333
        if new_workspace:
1334
            item.workspace = new_workspace
1335
            if new_parent and \
1336
                    new_parent.workspace_id != new_workspace.workspace_id:
1337
                raise WorkspacesDoNotMatch(
1338
                    'new parent workspace and new workspace should be the same.'
1339
                )
1340
        else:
1341
            if new_parent:
1342
                item.workspace = new_parent.workspace
1343
1344
        self._is_content_label_free_or_raise(
1345
            item.label,
1346
            item.workspace,
1347
            item.parent,
1348
            exclude_content_id=item.content_id
1349
        )
1350
        item.revision_type = ActionDescription.MOVE
1351
1352
    def copy(
1353
        self,
1354
        item: Content,
1355
        new_parent: Content=None,
1356
        new_label: str=None,
1357
        do_save: bool=True,
1358
        do_notify: bool=True,
1359
    ) -> Content:
1360
        """
1361
        Copy nearly all content, revision included. Children not included, see
1362
        "copy_children" for this.
1363
        :param item: Item to copy
1364
        :param new_parent: new parent of the new copied item
1365
        :param new_label: new label of the new copied item
1366
        :param do_notify: notify copy or not
1367
        :return: Newly copied item
1368
        """
1369
        if (not new_parent and not new_label) or (new_parent == item.parent and new_label == item.label):  # nopep8
1370
            # TODO - G.M - 08-03-2018 - Use something else than value error
1371
            raise ValueError("You can't copy file into itself")
1372
        if new_parent:
1373
            workspace = new_parent.workspace
1374
            parent = new_parent
1375
        else:
1376
            workspace = item.workspace
1377
            parent = item.parent
1378
        label = new_label or item.label
1379
1380
        self._is_content_label_free_or_raise(label, workspace, parent)
1381
        content = item.copy(parent)
1382
        # INFO - GM - 15-03-2018 - add "copy" revision
1383
        with new_revision(
1384
            session=self._session,
1385
            tm=transaction.manager,
1386
            content=content,
1387
            force_create_new_revision=True
1388
        ) as rev:
1389
            rev.parent = parent
1390
            rev.workspace = workspace
1391
            rev.label = label
1392
            rev.revision_type = ActionDescription.COPY
1393
            rev.properties['origin'] = {
1394
                'content': item.id,
1395
                'revision': item.last_revision.revision_id,
1396
            }
1397
        if do_save:
1398
            self.save(content, ActionDescription.COPY, do_notify=do_notify)
1399
        return content
1400
1401
    def copy_children(self, origin_content: Content, new_content: Content):
1402
        for child in origin_content.children:
1403
            self.copy(child, new_content)
1404
1405
    def move_recursively(self, item: Content,
1406
                         new_parent: Content, new_workspace: Workspace):
1407
        self.move(item, new_parent, False, new_workspace)
1408
        self.save(item, do_notify=False)
1409
1410
        for child in item.children:
1411
            with new_revision(
1412
                session=self._session,
1413
                tm=transaction.manager,
1414
                content=child
1415
            ):
1416
                self.move_recursively(child, item, new_workspace)
1417
        return
1418
1419
    def update_content(self, item: Content, new_label: str, new_content: str=None) -> Content:
1420
        if item.status and CONTENT_STATUS.get_one_by_slug(item.status).global_status == GlobalStatus.CLOSED.value:  # nopep8
1421
            raise ContentClosed("Can't update closed file, you need to change his status before any change.")  # nopep8
1422
        if item.label == new_label and item.description == new_content:
1423
            # TODO - G.M - 20-03-2018 - Fix internatization for webdav access.
1424
            # Internatization disabled in libcontent for now.
1425
            raise SameValueError('The content did not changed')
1426
        if not new_label:
1427
            raise EmptyLabelNotAllowed()
1428
1429
        self._is_content_label_free_or_raise(
1430
            new_label,
1431
            item.workspace,
1432
            item.parent,
1433
            exclude_content_id=item.content_id
1434
        )
1435
1436
        item.owner = self._user
1437
        item.label = new_label
1438
        item.description = new_content if new_content else item.description # TODO: convert urls into links
1439
        item.revision_type = ActionDescription.EDITION
1440
        return item
1441
1442
    def update_file_data(self, item: Content, new_filename: str, new_mimetype: str, new_content: bytes) -> Content:
1443
        if item.status and CONTENT_STATUS.get_one_by_slug(item.status).global_status == GlobalStatus.CLOSED.value:  # nopep8
1444
            raise ContentClosed("Can't update closed file, you need to change his status before any change.")  # nopep8
1445
        if new_mimetype == item.file_mimetype and \
1446
                new_content == item.depot_file.file.read():
1447
            raise SameValueError('The content did not changed')
1448
        item.owner = self._user
1449
        item.file_name = new_filename
1450
        item.file_mimetype = new_mimetype
1451
        item.depot_file = FileIntent(
1452
            new_content,
1453
            new_filename,
1454
            new_mimetype,
1455
        )
1456
        item.revision_type = ActionDescription.REVISION
1457
        return item
1458
1459
    def archive(self, content: Content):
1460
        content.owner = self._user
1461
        content.is_archived = True
1462
        # TODO - G.M - 12-03-2018 - Inspect possible label conflict problem
1463
        # INFO - G.M - 12-03-2018 - Set label name to avoid trouble when
1464
        # un-archiving file.
1465
        content.label = '{label}-{action}-{date}'.format(
1466
            label=content.label,
1467
            action='archived',
1468
            date=current_date_for_filename()
1469
        )
1470
        content.revision_type = ActionDescription.ARCHIVING
1471
1472
    def unarchive(self, content: Content):
1473
        content.owner = self._user
1474
        content.is_archived = False
1475
        content.revision_type = ActionDescription.UNARCHIVING
1476
1477
    def delete(self, content: Content):
1478
        content.owner = self._user
1479
        content.is_deleted = True
1480
        # TODO - G.M - 12-03-2018 - Inspect possible label conflict problem
1481
        # INFO - G.M - 12-03-2018 - Set label name to avoid trouble when
1482
        # un-deleting file.
1483
        content.label = '{label}-{action}-{date}'.format(
1484
            label=content.label,
1485
            action='deleted',
1486
            date=current_date_for_filename()
1487
        )
1488
        content.revision_type = ActionDescription.DELETION
1489
1490
    def undelete(self, content: Content):
1491
        content.owner = self._user
1492
        content.is_deleted = False
1493
        content.revision_type = ActionDescription.UNDELETION
1494
1495
    def get_preview_page_nb(self, revision_id: int) -> typing.Optional[int]:
1496
        file_path = self.get_one_revision_filepath(revision_id)
1497
        try:
1498
            nb_pages = self.preview_manager.get_page_nb(file_path)
1499
        except UnsupportedMimeType:
1500
            return None
1501
        return nb_pages
1502
1503
    def has_pdf_preview(self, revision_id: int) -> bool:
1504
        file_path = self.get_one_revision_filepath(revision_id)
1505
        try:
1506
            has_pdf_preview = self.preview_manager.has_pdf_preview(file_path)
1507
        except UnsupportedMimeType:
1508
            has_pdf_preview = False
1509
        return has_pdf_preview
1510
1511
    def mark_read__all(
1512
            self,
1513
            read_datetime: datetime=None,
1514
            do_flush: bool=True,
1515
            recursive: bool=True
1516
    ) -> None:
1517
        """
1518
        Read content of all workspace visible for the user.
1519
        :param read_datetime: date of readigin
1520
        :param do_flush: flush database
1521
        :param recursive: mark read subcontent too
1522
        :return: nothing
1523
        """
1524
        return self.mark_read__workspace(None, read_datetime, do_flush, recursive) # nopep8
1525
1526
    def mark_read__workspace(self,
1527
                       workspace : Workspace,
1528
                       read_datetime: datetime=None,
1529
                       do_flush: bool=True,
1530
                       recursive: bool=True
1531
    ) -> None:
1532
        """
1533
        Read content of a workspace visible for the user.
1534
        :param read_datetime: date of readigin
1535
        :param do_flush: flush database
1536
        :param recursive: mark read subcontent too
1537
        :return: nothing
1538
        """
1539
        itemset = self.get_last_active(workspace)
1540
        for item in itemset:
1541
            if item.has_new_information_for(self._user):
1542
                self.mark_read(item, read_datetime, do_flush, recursive)
1543
1544
    def mark_read(
1545
            self,
1546
            content: Content,
1547
            read_datetime: datetime=None,
1548
            do_flush: bool=True,
1549
            recursive: bool=True
1550
    ) -> Content:
1551
        """
1552
        Read content for the user.
1553
        :param read_datetime: date of readigin
1554
        :param do_flush: flush database
1555
        :param recursive: mark read subcontent too
1556
        :return: nothing
1557
        """
1558
        assert self._user
1559
        assert content
1560
1561
        # The algorithm is:
1562
        # 1. define the read datetime
1563
        # 2. update all revisions related to current Content
1564
        # 3. do the same for all child revisions
1565
        #    (ie parent_id is content_id of current content)
1566
1567
        if not read_datetime:
1568
            read_datetime = datetime.datetime.now()
1569
1570
        viewed_revisions = self._session.query(ContentRevisionRO) \
1571
            .filter(ContentRevisionRO.content_id==content.content_id).all()
1572
1573
        for revision in viewed_revisions:
1574
            revision.read_by[self._user] = read_datetime
1575
1576
        if recursive:
1577
            # mark read :
1578
            # - all children
1579
            # - parent stuff (if you mark a comment as read,
1580
            #                 then you have seen the parent)
1581
            # - parent comments
1582
            for child in content.get_valid_children():
1583
                self.mark_read(child, read_datetime=read_datetime,
1584
                               do_flush=False)
1585
1586
            if CONTENT_TYPES.Comment.slug == content.type:
1587
                self.mark_read(content.parent, read_datetime=read_datetime,
1588
                               do_flush=False, recursive=False)
1589
                for comment in content.parent.get_comments():
1590
                    if comment != content:
1591
                        self.mark_read(comment, read_datetime=read_datetime,
1592
                                       do_flush=False, recursive=False)
1593
1594
        if do_flush:
1595
            self.flush()
1596
1597
        return content
1598
1599
    def mark_unread(self, content: Content, do_flush=True) -> Content:
1600
        assert self._user
1601
        assert content
1602
1603
        revisions = self._session.query(ContentRevisionRO) \
1604
            .filter(ContentRevisionRO.content_id==content.content_id).all()
1605
1606
        for revision in revisions:
1607
            try:
1608
                del revision.read_by[self._user]
1609
            except KeyError:
1610
                pass
1611
1612
        for child in content.get_valid_children():
1613
            self.mark_unread(child, do_flush=False)
1614
1615
        if do_flush:
1616
            self.flush()
1617
1618
        return content
1619
1620
    def flush(self):
1621
        self._session.flush()
1622
1623
    def save(self, content: Content, action_description: str=None, do_flush=True, do_notify=True):
1624
        """
1625
        Save an object, flush the session and set the revision_type property
1626
        :param content:
1627
        :param action_description:
1628
        :return:
1629
        """
1630
        assert action_description is None or action_description in ActionDescription.allowed_values()
1631
1632
        if not action_description:
1633
            # See if the last action has been modified
1634
            if content.revision_type==None or len(get_history(content.revision, 'revision_type'))<=0:
1635
                # The action has not been modified, so we set it to default edition
1636
                action_description = ActionDescription.EDITION
1637
1638
        if action_description:
1639
            content.revision_type = action_description
1640
1641
        if do_flush:
1642
            # INFO - 2015-09-03 - D.A.
1643
            # There are 2 flush because of the use
1644
            # of triggers for content creation
1645
            #
1646
            # (when creating a content, actually this is an insert of a new
1647
            # revision in content_revisions ; so the mark_read operation need
1648
            # to get full real data from database before to be prepared.
1649
1650
            self._session.add(content)
1651
            self._session.flush()
1652
1653
            # TODO - 2015-09-03 - D.A. - Do not use triggers
1654
            # We should create a new ContentRevisionRO object instead of Content
1655
            # This would help managing view/not viewed status
1656
            self.mark_read(content, do_flush=True)
1657
1658
        if do_notify:
1659
            self.do_notify(content)
1660
1661
    def do_notify(self, content: Content):
1662
        """
1663
        Allow to force notification for a given content. By default, it is
1664
        called during the .save() operation
1665
        :param content:
1666
        :return:
1667
        """
1668
        NotifierFactory.create(
1669
            config=self._config,
1670
            current_user=self._user,
1671
            session=self._session,
1672
        ).notify_content_update(content)
1673
1674
    def get_keywords(self, search_string, search_string_separators=None) -> [str]:
1675
        """
1676
        :param search_string: a list of coma-separated keywords
1677
        :return: a list of str (each keyword = 1 entry
1678
        """
1679
1680
        search_string_separators = search_string_separators or ContentApi.SEARCH_SEPARATORS
1681
1682
        keywords = []
1683
        if search_string:
1684
            keywords = [keyword.strip() for keyword in re.split(search_string_separators, search_string)]
1685
1686
        return keywords
1687
1688
    def search(self, keywords: [str]) -> Query:
1689
        """
1690
        :return: a sorted list of Content items
1691
        """
1692
1693
        if len(keywords)<=0:
1694
            return None
1695
1696
        filter_group_label = list(Content.label.ilike('%{}%'.format(keyword)) for keyword in keywords)
1697
        filter_group_desc = list(Content.description.ilike('%{}%'.format(keyword)) for keyword in keywords)
1698
        title_keyworded_items = self._hard_filtered_base_query().\
1699
            filter(or_(*(filter_group_label+filter_group_desc))).\
1700
            options(joinedload('children_revisions')).\
1701
            options(joinedload('parent'))
1702
1703
        return title_keyworded_items
1704
1705
    def get_all_types(self) -> typing.List[ContentType]:
1706
        labels = CONTENT_TYPES.endpoint_allowed_types_slug()
1707
        content_types = []
1708
        for label in labels:
1709
            content_types.append(CONTENT_TYPES.get_one_by_slug(label))
1710
1711
        return content_types
1712
1713
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1714
    def exclude_unavailable(
1715
        self,
1716
        contents: typing.List[Content],
1717
    ) -> typing.List[Content]:
1718
        """
1719
        Update and return list with content under archived/deleted removed.
1720
        :param contents: List of contents to parse
1721
        """
1722
        for content in contents[:]:
1723
            if self.content_under_deleted(content) or self.content_under_archived(content):
1724
                contents.remove(content)
1725
        return contents
1726
1727
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1728
    def content_under_deleted(self, content: Content) -> bool:
1729
        if content.parent:
1730
            if content.parent.is_deleted:
1731
                return True
1732
            if content.parent.parent:
1733
                return self.content_under_deleted(content.parent)
1734
        return False
1735
1736
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1737
    def content_under_archived(self, content: Content) -> bool:
1738
        if content.parent:
1739
            if content.parent.is_archived:
1740
                return True
1741
            if content.parent.parent:
1742
                return self.content_under_archived(content.parent)
1743
        return False
1744
1745
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1746
    def find_one_by_unique_property(
1747
            self,
1748
            property_name: str,
1749
            property_value: str,
1750
            workspace: Workspace=None,
1751
    ) -> Content:
1752
        """
1753
        Return Content who contains given property.
1754
        Raise sqlalchemy.orm.exc.MultipleResultsFound if more than one Content
1755
        contains this property value.
1756
        :param property_name: Name of property
1757
        :param property_value: Value of property
1758
        :param workspace: Workspace who contains Content
1759
        :return: Found Content
1760
        """
1761
        # TODO - 20160602 - Bastien: Should be JSON type query
1762
        # see https://www.compose.io/articles/using-json-extensions-in-\
1763
        # postgresql-from-python-2/
1764
        query = self._base_query(workspace=workspace).filter(
1765
            Content._properties.like(
1766
                '%"{property_name}": "{property_value}"%'.format(
1767
                    property_name=property_name,
1768
                    property_value=property_value,
1769
                )
1770
            )
1771
        )
1772
        return query.one()
1773
1774
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1775
    def generate_folder_label(
1776
            self,
1777
            workspace: Workspace,
1778
            parent: Content=None,
1779
    ) -> str:
1780
        """
1781
        Generate a folder label
1782
        :param workspace: Future folder workspace
1783
        :param parent: Parent of foture folder (can be None)
1784
        :return: Generated folder name
1785
        """
1786
        _ = self.translator.get_translation
1787
        query = self._base_query(workspace=workspace)\
1788
            .filter(Content.label.ilike('{0}%'.format(
1789
                _('New folder'),
1790
            )))
1791
        if parent:
1792
            query = query.filter(Content.parent == parent)
1793
1794
        return _('New folder {0}').format(
1795
            query.count() + 1,
1796
        )
1797