Passed
Pull Request — develop (#87)
by inkhey
01:43
created

ContentApi.save()   B

Complexity

Conditions 7

Size

Total Lines 37
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 13
dl 0
loc 37
rs 8
c 0
b 0
f 0
cc 7
nop 5
1
# -*- coding: utf-8 -*-
2
import datetime
3
import os
4
import re
5
import typing
6
from contextlib import contextmanager
7
8
import sqlalchemy
9
import transaction
10
from depot.io.utils import FileIntent
11
from depot.manager import DepotManager
12
from preview_generator.exception import UnsupportedMimeType
13
from preview_generator.manager import PreviewManager
14
from sqlalchemy import desc
15
from sqlalchemy import func
16
from sqlalchemy import or_
17
from sqlalchemy.orm import Query
18
from sqlalchemy.orm import aliased
19
from sqlalchemy.orm import joinedload
20
from sqlalchemy.orm.attributes import QueryableAttribute
21
from sqlalchemy.orm.attributes import get_history
22
from sqlalchemy.orm.exc import NoResultFound
23
from sqlalchemy.orm.session import Session
24
from sqlalchemy.sql.elements import and_
25
26
from tracim_backend.app_models.contents import content_status_list
27
from tracim_backend.app_models.contents import content_type_list
28
from tracim_backend.app_models.contents import FOLDER_TYPE
29
from tracim_backend.app_models.contents import ContentStatus
30
from tracim_backend.app_models.contents import ContentType
31
from tracim_backend.app_models.contents import GlobalStatus
32
from tracim_backend.exceptions import ContentInNotEditableState
33
from tracim_backend.exceptions import ContentLabelAlreadyUsedHere
34
from tracim_backend.exceptions import ContentNotFound
35
from tracim_backend.exceptions import ContentTypeNotExist
36
from tracim_backend.exceptions import EmptyCommentContentNotAllowed
37
from tracim_backend.exceptions import EmptyLabelNotAllowed
38
from tracim_backend.exceptions import PageOfPreviewNotFound
39
from tracim_backend.exceptions import PreviewDimNotAllowed
40
from tracim_backend.exceptions import RevisionDoesNotMatchThisContent
41
from tracim_backend.exceptions import SameValueError
42
from tracim_backend.exceptions import UnallowedSubContent
43
from tracim_backend.exceptions import UnavailablePreview
44
from tracim_backend.exceptions import WorkspacesDoNotMatch
45
from tracim_backend.lib.core.notifications import NotifierFactory
46
from tracim_backend.lib.utils.logger import logger
47
from tracim_backend.lib.utils.translation import DEFAULT_FALLBACK_LANG
48
from tracim_backend.lib.utils.translation import Translator
49
from tracim_backend.lib.utils.utils import cmp_to_key
50
from tracim_backend.lib.utils.utils import current_date_for_filename
51
from tracim_backend.lib.utils.utils import preview_manager_page_format
52
from tracim_backend.models.auth import User
53
from tracim_backend.models.context_models import ContentInContext
54
from tracim_backend.models.context_models import PreviewAllowedDim
55
from tracim_backend.models.context_models import RevisionInContext
56
from tracim_backend.models.data import ActionDescription
57
from tracim_backend.models.data import Content
58
from tracim_backend.models.data import ContentRevisionRO
59
from tracim_backend.models.data import NodeTreeItem
60
from tracim_backend.models.data import RevisionReadStatus
61
from tracim_backend.models.data import UserRoleInWorkspace
62
from tracim_backend.models.data import Workspace
63
from tracim_backend.models.revision_protection import new_revision
64
65
__author__ = 'damien'
66
67
68
# TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
69
def compare_content_for_sorting_by_type_and_name(
70
        content1: Content,
71
        content2: Content
72
) -> int:
73
    """
74
    :param content1:
75
    :param content2:
76
    :return:    1 if content1 > content2
77
                -1 if content1 < content2
78
                0 if content1 = content2
79
    """
80
81
    if content1.type == content2.type:
82
        if content1.get_label().lower()>content2.get_label().lower():
83
            return 1
84
        elif content1.get_label().lower()<content2.get_label().lower():
85
            return -1
86
        return 0
87
    else:
88
        # TODO - D.A. - 2014-12-02 - Manage Content Types Dynamically
89
        content_type_order = [
90
            content_type_list.Folder.slug,
91
            content_type_list.Page.slug,
92
            content_type_list.Thread.slug,
93
            content_type_list.File.slug,
94
        ]
95
96
        content_1_type_index = content_type_order.index(content1.type)
97
        content_2_type_index = content_type_order.index(content2.type)
98
        result = content_1_type_index - content_2_type_index
99
100
        if result < 0:
101
            return -1
102
        elif result > 0:
103
            return 1
104
        else:
105
            return 0
106
107
# TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
108
def compare_tree_items_for_sorting_by_type_and_name(
109
        item1: NodeTreeItem,
110
        item2: NodeTreeItem
111
) -> int:
112
    return compare_content_for_sorting_by_type_and_name(item1.node, item2.node)
113
114
115
class ContentApi(object):
116
117
    SEARCH_SEPARATORS = ',| '
118
    SEARCH_DEFAULT_RESULT_NB = 50
119
120
    # DISPLAYABLE_CONTENTS = (
121
    #     content_type_list.Folder.slug,
122
    #     content_type_list.File.slug,
123
    #     content_type_list.Comment.slug,
124
    #     content_type_list.Thread.slug,
125
    #     content_type_list.Page.slug,
126
    #     content_type_list.Page.slugLegacy,
127
    #     ContentType.MarkdownPage,
128
    # )
129
130
    def __init__(
131
            self,
132
            session: Session,
133
            current_user: typing.Optional[User],
134
            config,
135
            show_archived: bool = False,
136
            show_deleted: bool = False,
137
            show_temporary: bool = False,
138
            show_active: bool = True,
139
            all_content_in_treeview: bool = True,
140
            force_show_all_types: bool = False,
141
            disable_user_workspaces_filter: bool = False,
142
    ) -> None:
143
        self._session = session
144
        self._user = current_user
145
        self._config = config
146
        self._user_id = current_user.user_id if current_user else None
147
        self._show_archived = show_archived
148
        self._show_deleted = show_deleted
149
        self._show_temporary = show_temporary
150
        self._show_active = show_active
151
        self._show_all_type_of_contents_in_treeview = all_content_in_treeview
152
        self._force_show_all_types = force_show_all_types
153
        self._disable_user_workspaces_filter = disable_user_workspaces_filter
154
        self.preview_manager = PreviewManager(self._config.PREVIEW_CACHE_DIR, create_folder=True)  # nopep8
155
        default_lang = None
156
        if self._user:
157
            default_lang = self._user.lang
158
        if not default_lang:
159
            default_lang = DEFAULT_FALLBACK_LANG
160
        self.translator = Translator(app_config=self._config, default_lang=default_lang)  # nopep8
161
162
    @contextmanager
163
    def show(
164
            self,
165
            show_archived: bool=False,
166
            show_deleted: bool=False,
167
            show_temporary: bool=False,
168
    ) -> typing.Generator['ContentApi', None, None]:
169
        """
170
        Use this method as context manager to update show_archived,
171
        show_deleted and show_temporary properties during context.
172
        :param show_archived: show archived contents
173
        :param show_deleted:  show deleted contents
174
        :param show_temporary:  show temporary contents
175
        """
176
        previous_show_archived = self._show_archived
177
        previous_show_deleted = self._show_deleted
178
        previous_show_temporary = self._show_temporary
179
180
        try:
181
            self._show_archived = show_archived
182
            self._show_deleted = show_deleted
183
            self._show_temporary = show_temporary
184
            yield self
185
        finally:
186
            self._show_archived = previous_show_archived
187
            self._show_deleted = previous_show_deleted
188
            self._show_temporary = previous_show_temporary
189
190
    def get_content_in_context(self, content: Content) -> ContentInContext:
191
        return ContentInContext(content, self._session, self._config, self._user)  # nopep8
192
193
    def get_revision_in_context(self, revision: ContentRevisionRO) -> RevisionInContext:  # nopep8
194
        # TODO - G.M - 2018-06-173 - create revision in context object
195
        return RevisionInContext(revision, self._session, self._config, self._user) # nopep8
196
    
197
    def _get_revision_join(self) -> sqlalchemy.sql.elements.BooleanClauseList:
198
        """
199
        Return the Content/ContentRevision query join condition
200
        :return: Content/ContentRevision query join condition
201
        """
202
        return and_(Content.id == ContentRevisionRO.content_id,
203
                    ContentRevisionRO.revision_id == self._session.query(
204
                        ContentRevisionRO.revision_id)
205
                    .filter(ContentRevisionRO.content_id == Content.id)
206
                    .order_by(ContentRevisionRO.revision_id.desc())
207
                    .limit(1)
208
                    .correlate(Content))
209
210
    def get_canonical_query(self) -> Query:
211
        """
212
        Return the Content/ContentRevision base query who join these table on the last revision.
213
        :return: Content/ContentRevision Query
214
        """
215
        return self._session.query(Content)\
216
            .join(ContentRevisionRO, self._get_revision_join())
217
218
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
219
    @classmethod
220
    def sort_tree_items(
221
        cls,
222
        content_list: typing.List[NodeTreeItem],
223
    )-> typing.List[NodeTreeItem]:
224
        news = []
225
        for item in content_list:
226
            news.append(item)
227
228
        content_list.sort(key=cmp_to_key(
229
            compare_tree_items_for_sorting_by_type_and_name,
230
        ))
231
232
        return content_list
233
234
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
235
    @classmethod
236
    def sort_content(
237
        cls,
238
        content_list: typing.List[Content],
239
    ) -> typing.List[Content]:
240
        content_list.sort(key=cmp_to_key(compare_content_for_sorting_by_type_and_name))
241
        return content_list
242
243
    def __real_base_query(
244
        self,
245
        workspace: Workspace = None,
246
    ) -> Query:
247
        result = self.get_canonical_query()
248
249
        # Exclude non displayable types
250
        if not self._force_show_all_types:
251
            result = result.filter(Content.type.in_(content_type_list.query_allowed_types_slugs()))
252
253
        if workspace:
254
            result = result.filter(Content.workspace_id == workspace.workspace_id)
255
256
        # Security layer: if user provided, filter
257
        # with user workspaces privileges
258
        if self._user and not self._disable_user_workspaces_filter:
259
            user = self._session.query(User).get(self._user_id)
260
            # Filter according to user workspaces
261
            workspace_ids = [r.workspace_id for r in user.roles \
262
                             if r.role>=UserRoleInWorkspace.READER]
263
            result = result.filter(or_(
264
                Content.workspace_id.in_(workspace_ids),
265
                # And allow access to non workspace document when he is owner
266
                and_(
267
                    Content.workspace_id == None,
268
                    Content.owner_id == self._user_id,
269
                )
270
            ))
271
272
        return result
273
274
    def _base_query(self, workspace: Workspace=None) -> Query:
275
        result = self.__real_base_query(workspace)
276
277
        if not self._show_active:
278
            result = result.filter(or_(
279
                Content.is_deleted==True,
280
                Content.is_archived==True,
281
            ))
282
        if not self._show_deleted:
283
            result = result.filter(Content.is_deleted==False)
284
285
        if not self._show_archived:
286
            result = result.filter(Content.is_archived==False)
287
288
        if not self._show_temporary:
289
            result = result.filter(Content.is_temporary==False)
290
291
        return result
292
293
    def __revisions_real_base_query(
294
        self,
295
        workspace: Workspace=None,
296
    ) -> Query:
297
        result = self._session.query(ContentRevisionRO)
298
299
        # Exclude non displayable types
300
        if not self._force_show_all_types:
301
            result = result.filter(Content.type.in_(self.DISPLAYABLE_CONTENTS))
302
303
        if workspace:
304
            result = result.filter(ContentRevisionRO.workspace_id==workspace.workspace_id)
305
306
        if self._user:
307
            user = self._session.query(User).get(self._user_id)
308
            # Filter according to user workspaces
309
            workspace_ids = [r.workspace_id for r in user.roles \
310
                             if r.role>=UserRoleInWorkspace.READER]
311
            result = result.filter(ContentRevisionRO.workspace_id.in_(workspace_ids))
312
313
        return result
314
315
    def _revisions_base_query(
316
        self,
317
        workspace: Workspace=None,
318
    ) -> Query:
319
        result = self.__revisions_real_base_query(workspace)
320
321
        if not self._show_deleted:
322
            result = result.filter(ContentRevisionRO.is_deleted==False)
323
324
        if not self._show_archived:
325
            result = result.filter(ContentRevisionRO.is_archived==False)
326
327
        if not self._show_temporary:
328
            result = result.filter(Content.is_temporary==False)
329
330
        return result
331
332
    def _hard_filtered_base_query(
333
        self,
334
        workspace: Workspace=None,
335
    ) -> Query:
336
        """
337
        If set to True, then filterign on is_deleted and is_archived will also
338
        filter parent properties. This is required for search() function which
339
        also search in comments (for example) which may be 'not deleted' while
340
        the associated content is deleted
341
342
        :param hard_filtering:
343
        :return:
344
        """
345
        result = self.__real_base_query(workspace)
346
347
        if not self._show_deleted:
348
            parent = aliased(Content)
349
            result = result.join(parent, Content.parent).\
350
                filter(Content.is_deleted==False).\
351
                filter(parent.is_deleted==False)
352
353
        if not self._show_archived:
354
            parent = aliased(Content)
355
            result = result.join(parent, Content.parent).\
356
                filter(Content.is_archived==False).\
357
                filter(parent.is_archived==False)
358
359
        if not self._show_temporary:
360
            parent = aliased(Content)
361
            result = result.join(parent, Content.parent). \
362
                filter(Content.is_temporary == False). \
363
                filter(parent.is_temporary == False)
364
365
        return result
366
367
    def get_base_query(
368
        self,
369
        workspace: Workspace,
370
    ) -> Query:
371
        return self._base_query(workspace)
372
373
    # TODO - G.M - 2018-07-17 - [Cleanup] Drop this method if unneeded
374
    # def get_child_folders(self, parent: Content=None, workspace: Workspace=None, filter_by_allowed_content_types: list=[], removed_item_ids: list=[], allowed_node_types=None) -> typing.List[Content]:
375
    #     """
376
    #     This method returns child items (folders or items) for left bar treeview.
377
    # 
378
    #     :param parent:
379
    #     :param workspace:
380
    #     :param filter_by_allowed_content_types:
381
    #     :param removed_item_ids:
382
    #     :param allowed_node_types: This parameter allow to hide folders for which the given type of content is not allowed.
383
    #            For example, if you want to move a Page from a folder to another, you should show only folders that accept pages
384
    #     :return:
385
    #     """
386
    #     filter_by_allowed_content_types = filter_by_allowed_content_types or []  # FDV
387
    #     removed_item_ids = removed_item_ids or []  # FDV
388
    # 
389
    #     if not allowed_node_types:
390
    #         allowed_node_types = [content_type_list.Folder.slug]
391
    #     elif allowed_node_types==content_type_list.Any_SLUG:
392
    #         allowed_node_types = ContentType.all()
393
    # 
394
    #     parent_id = parent.content_id if parent else None
395
    #     folders = self._base_query(workspace).\
396
    #         filter(Content.parent_id==parent_id).\
397
    #         filter(Content.type.in_(allowed_node_types)).\
398
    #         filter(Content.content_id.notin_(removed_item_ids)).\
399
    #         all()
400
    # 
401
    #     if not filter_by_allowed_content_types or \
402
    #                     len(filter_by_allowed_content_types)<=0:
403
    #         # Standard case for the left treeview: we want to show all contents
404
    #         # in the left treeview... so we still filter because for example
405
    #         # comments must not appear in the treeview
406
    #         return [folder for folder in folders \
407
    #                 if folder.type in ContentType.allowed_types_for_folding()]
408
    # 
409
    #     # Now this is a case of Folders only (used for moving content)
410
    #     # When moving a content, you must get only folders that allow to be filled
411
    #     # with the type of content you want to move
412
    #     result = []
413
    #     for folder in folders:
414
    #         for allowed_content_type in filter_by_allowed_content_types:
415
    # 
416
    #             is_folder = folder.type == content_type_list.Folder.slug
417
    #             content_type__allowed = folder.properties['allowed_content'][allowed_content_type] == True
418
    # 
419
    #             if is_folder and content_type__allowed:
420
    #                 result.append(folder)
421
    #                 break
422
    # 
423
    #     return result
424
425
    def _is_content_label_free(
426
            self,
427
            label: str,
428
            workspace: Workspace,
429
            parent: Content = None,
430
            exclude_content_id: int = None,
431
    ) -> bool:
432
        """
433
        Check if content label is free
434
        :param label: content label
435
        :param workspace: workspace of the content
436
        :param parent: parent of the content
437
        :param exclude_content_id: exclude a specific content_id (useful
438
        to verify  other content for content update)
439
        :return: True if content label is available
440
        """
441
        # INFO - G.M - 2018-09-04 - Method should not be used by special content
442
        # with empty label like comment.
443
        assert label
444
        assert workspace
445
        query = self.get_base_query(workspace)
446
447
        if parent:
448
            query = query.filter(Content.parent_id == parent.content_id)
449
        else:
450
            query = query.filter(Content.parent_id == None)
451
452
        if exclude_content_id:
453
            query = query.filter(Content.content_id != exclude_content_id)
454
        query = query.filter(Content.workspace_id == workspace.workspace_id)
455
456
        nb_content_with_the_label = query.filter(Content.label == label).count()
457
        if nb_content_with_the_label == 0:
458
            return True
459
        elif nb_content_with_the_label == 1:
460
            return False
461
        else:
462
            critical_error_text = 'Something is wrong in the database ! '\
463
                                  'Content label should be unique ' \
464
                                  'in a same folder in database' \
465
                                  'but you have {nb} content with ' \
466
                                  'label {label} in workspace {workspace_id}'
467
468
            critical_error_text = critical_error_text.format(
469
                nb=nb_content_with_the_label,
470
                label=label,
471
                workspace_id=workspace.workspace_id,
472
            )
473
            if parent:
474
                critical_error_text = '{text} and parent as content {parent_id}'.format(  # nopep8
475
                    text=critical_error_text,
476
                    parent_id=parent.parent_id
477
                )
478
            logger.critical(self, critical_error_text)
479
            return False
480
481
    def _is_content_label_free_or_raise(
482
        self,
483
        label: str,
484
        workspace: Workspace,
485
        parent: Content = None,
486
        exclude_content_id: int = None,
487
    ) -> None:
488
        """
489
        Same as _is_content_label_free but raise exception instead of
490
        returning boolean if content label is already used
491
        """
492
        if not self._is_content_label_free(
493
            label,
494
            workspace,
495
            parent,
496
            exclude_content_id
497
        ):
498
            text = 'A Content already exist with the same label {label} ' \
499
                   ' in workspace {workspace_id}'.format(
500
                      label=label,
501
                      workspace_id=workspace.workspace_id,
502
                   )
503
            if parent:
504
                text = '{text} and parent as content {parent_id}'.format(
505
                    text=text,
506
                    parent_id=parent.parent_id
507
                )
508
            raise ContentLabelAlreadyUsedHere(text)
509
510
    def create(self, content_type_slug: str, workspace: Workspace, parent: Content=None, label: str = '', filename: str = '', do_save=False, is_temporary: bool=False, do_notify=True) -> Content:
511
        # TODO - G.M - 2018-07-16 - raise Exception instead of assert
512
        assert content_type_slug != content_type_list.Any_SLUG
513
        assert not (label and filename)
514
515
        if content_type_slug == FOLDER_TYPE and not label:
516
            label = self.generate_folder_label(workspace, parent)
517
518
        # TODO BS 2018-08-13: Despite that workspace is required, create_comment
519
        # can call here with None. Must update create_comment tu require the
520
        # workspace.
521
        if not workspace:
522
            workspace = parent.workspace
523
524
        content_type = content_type_list.get_one_by_slug(content_type_slug)
525
        if parent and parent.properties and 'allowed_content' in parent.properties:
526
            if content_type.slug not in parent.properties['allowed_content'] or not parent.properties['allowed_content'][content_type.slug]:
527
                raise UnallowedSubContent(' SubContent of type {subcontent_type}  not allowed in content {content_id}'.format(  # nopep8
528
                    subcontent_type=content_type.slug,
529
                    content_id=parent.content_id,
530
                ))
531
        if not workspace and parent:
532
            workspace = parent.workspace
533
534
        if workspace:
535
            if content_type.slug not in workspace.get_allowed_content_types():
536
                raise UnallowedSubContent(
537
                    ' SubContent of type {subcontent_type}  not allowed in workspace {content_id}'.format(  # nopep8
538
                        subcontent_type=content_type.slug,
539
                        content_id=workspace.workspace_id,
540
                    )
541
                )
542
        if filename:
543
            label = os.path.splitext(filename)[0]
544
        if label:
545
            self._is_content_label_free_or_raise(
546
                label,
547
                workspace,
548
                parent,
549
            )
550
551
        content = Content()
552
        if filename:
553
            # INFO - G.M - 2018-07-04 - File_name setting automatically
554
            # set label and file_extension
555
            content.file_name = label
556
        elif label:
557
            content.label = label
558
        else:
559
            if content_type_slug == content_type_list.Comment.slug:
560
                # INFO - G.M - 2018-07-16 - Default label for comments is
561
                # empty string.
562
                content.label = ''
563
            else:
564
                raise EmptyLabelNotAllowed('Content of this type should have a valid label')  # nopep8
565
566
        content.owner = self._user
567
        content.parent = parent
568
569
        content.workspace = workspace
570
        content.type = content_type.slug
571
        content.is_temporary = is_temporary
572
        content.revision_type = ActionDescription.CREATION
573
574
        if content.type in (
575
                content_type_list.Page.slug,
576
                content_type_list.Thread.slug,
577
        ):
578
            content.file_extension = '.html'
579
580
        if do_save:
581
            self._session.add(content)
582
            self.save(content, ActionDescription.CREATION, do_notify=do_notify)
583
        return content
584
585
    def create_comment(self, workspace: Workspace=None, parent: Content=None, content:str ='', do_save=False) -> Content:
586
        # TODO: check parent allowed_type and workspace allowed_ type
587
        assert parent and parent.type != FOLDER_TYPE
588
        if not content:
589
            raise EmptyCommentContentNotAllowed()
590
591
        item = self.create(
592
            content_type_slug=content_type_list.Comment.slug,
593
            workspace=workspace,
594
            parent=parent,
595
            do_notify=False,
596
            do_save=False,
597
            label='',
598
        )
599
        item.description = content
600
        item.revision_type = ActionDescription.COMMENT
601
602
        if do_save:
603
            self.save(item, ActionDescription.COMMENT)
604
        return item
605
606
    def get_one_from_revision(self, content_id: int, content_type: str, workspace: Workspace=None, revision_id=None) -> Content:
607
        """
608
        This method is a hack to convert a node revision item into a node
609
        :param content_id:
610
        :param content_type:
611
        :param workspace:
612
        :param revision_id:
613
        :return:
614
        """
615
616
        content = self.get_one(content_id, content_type, workspace)
617
        revision = self._session.query(ContentRevisionRO).filter(ContentRevisionRO.revision_id==revision_id).one()
618
619
        if revision.content_id==content.content_id:
620
            content.revision_to_serialize = revision.revision_id
621
        else:
622
            raise ValueError('Revision not found for given content')
623
624
        return content
625
626
    def get_one(self, content_id: int, content_type: str, workspace: Workspace=None, parent: Content=None) -> Content:
627
628
        if not content_id:
629
            return None
630
631
        base_request = self._base_query(workspace).filter(Content.content_id==content_id)
632
633
        if content_type!=content_type_list.Any_SLUG:
634
            base_request = base_request.filter(Content.type==content_type)
635
636
        if parent:
637
            base_request = base_request.filter(Content.parent_id==parent.content_id)  # nopep8
638
639
        try:
640
            content = base_request.one()
641
        except NoResultFound as exc:
642
            # TODO - G.M - 2018-07-16 - Add better support for all different
643
            # error case who can happened here
644
            # like content doesn't exist, wrong parent, wrong content_type, wrong workspace,
645
            # wrong access to this workspace, wrong base filter according
646
            # to content_status.
647
            raise ContentNotFound('Content "{}" not found in database'.format(content_id)) from exc  # nopep8
648
        return content
649
650
    def get_one_revision(self, revision_id: int = None, content: Content= None) -> ContentRevisionRO:  # nopep8
651
        """
652
        This method allow us to get directly any revision with its id
653
        :param revision_id: The content's revision's id that we want to return
654
        :param content: The content related to the revision, if None do not
655
        check if revision is related to this content.
656
        :return: An item Content linked with the correct revision
657
        """
658
        assert revision_id is not None# DYN_REMOVE
659
660
        revision = self._session.query(ContentRevisionRO).filter(ContentRevisionRO.revision_id == revision_id).one()
661
        if content and revision.content_id != content.content_id:
662
            raise RevisionDoesNotMatchThisContent(
663
                'revision {revision_id} is not a revision of content {content_id}'.format(  # nopep8
664
                    revision_id=revision.revision_id,
665
                    content_id=content.content_id,
666
                    )
667
            )
668
        return revision
669
670
    # INFO - A.P - 2017-07-03 - python file object getter
671
    # in case of we cook a version of preview manager that allows a pythonic
672
    # access to files
673
    # def get_one_revision_file(self, revision_id: int = None):
674
    #     """
675
    #     This function allows us to directly get a Python file object from its
676
    #     revision identifier.
677
    #     :param revision_id: The revision id of the file we want to return
678
    #     :return: The corresponding Python file object
679
    #     """
680
    #     revision = self.get_one_revision(revision_id)
681
    #     return DepotManager.get().get(revision.depot_file)
682
683
    def get_one_revision_filepath(self, revision_id: int = None) -> str:
684
        """
685
        This method allows us to directly get a file path from its revision
686
        identifier.
687
        :param revision_id: The revision id of the filepath we want to return
688
        :return: The corresponding filepath
689
        """
690
        revision = self.get_one_revision(revision_id)
691
        depot = DepotManager.get()
692
        depot_stored_file = depot.get(revision.depot_file)  # type: StoredFile
693
        depot_file_path = depot_stored_file._file_path  # type: str
694
        return depot_file_path
695
696
    # TODO - G.M - 2018-09-04 - [Cleanup] Is this method already needed ?
697
    def get_one_by_label_and_parent(
698
            self,
699
            content_label: str,
700
            content_parent: Content=None,
701
    ) -> Content:
702
        """
703
        This method let us request the database to obtain a Content with its name and parent
704
        :param content_label: Either the content's label or the content's filename if the label is None
705
        :param content_parent: The parent's content
706
        :param workspace: The workspace's content
707
        :return The corresponding Content
708
        """
709
        workspace = content_parent.workspace if content_parent else None
710
        query = self._base_query(workspace)
711
        parent_id = content_parent.content_id if content_parent else None
712
        query = query.filter(Content.parent_id == parent_id)
713
714
        file_name, file_extension = os.path.splitext(content_label)
715
716
        # TODO - G.M - 2018-09-04 - If this method is needed, it should be
717
        # rewritten in order to avoid content_type hardcoded code there
718
        return query.filter(
719
            or_(
720
                and_(
721
                    Content.type == content_type_list.File.slug,
722
                    Content.label == file_name,
723
                    Content.file_extension == file_extension,
724
                ),
725
                and_(
726
                    Content.type == content_type_list.Thread.slug,
727
                    Content.label == file_name,
728
                ),
729
                and_(
730
                    Content.type == content_type_list.Page.slug,
731
                    Content.label == file_name,
732
                ),
733
                and_(
734
                    Content.type == content_type_list.Folder.slug,
735
                    Content.label == content_label,
736
                ),
737
            )
738
        ).one()
739
740
    # TODO - G.M - 2018-09-04 - [Cleanup] Is this method already needed ?
741
    def get_one_by_label_and_parent_labels(
742
            self,
743
            content_label: str,
744
            workspace: Workspace,
745
            content_parent_labels: [str]=None,
746
    ):
747
        """
748
        Return content with it's label, workspace and parents labels (optional)
749
        :param content_label: label of content (label or file_name)
750
        :param workspace: workspace containing all of this
751
        :param content_parent_labels: Ordered list of labels representing path
752
            of folder (without workspace label).
753
        E.g.: ['foo', 'bar'] for complete path /Workspace1/foo/bar folder
754
        :return: Found Content
755
        """
756
        query = self._base_query(workspace)
757
        parent_folder = None
758
759
        # Grab content parent folder if parent path given
760
        if content_parent_labels:
761
            parent_folder = self.get_folder_with_workspace_path_labels(
762
                content_parent_labels,
763
                workspace,
764
            )
765
766
        # Build query for found content by label
767
        content_query = self.filter_query_for_content_label_as_path(
768
            query=query,
769
            content_label_as_file=content_label,
770
        )
771
772
        # Modify query to apply parent folder filter if any
773
        if parent_folder:
774
            content_query = content_query.filter(
775
                Content.parent_id == parent_folder.content_id,
776
            )
777
        else:
778
            content_query = content_query.filter(
779
                Content.parent_id == None,
780
            )
781
782
        # Filter with workspace
783
        content_query = content_query.filter(
784
            Content.workspace_id == workspace.workspace_id,
785
        )
786
787
        # Return the content
788
        return content_query\
789
            .order_by(
790
                Content.revision_id.desc(),
791
            )\
792
            .one()
793
794
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
795
    def get_folder_with_workspace_path_labels(
796
            self,
797
            path_labels: [str],
798
            workspace: Workspace,
799
    ) -> Content:
800
        """
801
        Return a Content folder for given relative path.
802
        TODO BS 20161124: Not safe if web interface allow folder duplicate names
803
        :param path_labels: List of labels representing path of folder
804
        (without workspace label).
805
        E.g.: ['foo', 'bar'] for complete path /Workspace1/foo/bar folder
806
        :param workspace: workspace of folders
807
        :return: Content folder
808
        """
809
        query = self._base_query(workspace)
810
        folder = None
811
812
        for label in path_labels:
813
            # Filter query on label
814
            folder_query = query \
815
                .filter(
816
                Content.type == content_type_list.Folder.slug,
817
                Content.label == label,
818
                Content.workspace_id == workspace.workspace_id,
819
                )
820
821
            # Search into parent folder (if already deep)
822
            if folder:
823
                folder_query = folder_query\
824
                    .filter(
825
                        Content.parent_id == folder.content_id,
826
                    )
827
            else:
828
                folder_query = folder_query \
829
                    .filter(Content.parent_id == None)
830
831
            # Get thirst corresponding folder
832
            folder = folder_query \
833
                .order_by(Content.revision_id.desc()) \
834
                .one()
835
836
        return folder
837
838
    # TODO - G.M - 2018-09-04 - [Cleanup] Is this method already needed ?
839
    def filter_query_for_content_label_as_path(
840
            self,
841
            query: Query,
842
            content_label_as_file: str,
843
            is_case_sensitive: bool = False,
844
    ) -> Query:
845
        """
846
        Apply normalised filters to found Content corresponding as given label.
847
        :param query: query to modify
848
        :param content_label_as_file: label in this
849
        FILE version, use Content.get_label_as_file().
850
        :param is_case_sensitive: Take care about case or not
851
        :return: modified query
852
        """
853
        file_name, file_extension = os.path.splitext(content_label_as_file)
854
855
        label_filter = Content.label == content_label_as_file
856
        file_name_filter = Content.label == file_name
857
        file_extension_filter = Content.file_extension == file_extension
858
859
        if not is_case_sensitive:
860
            label_filter = func.lower(Content.label) == \
861
                           func.lower(content_label_as_file)
862
            file_name_filter = func.lower(Content.label) == \
863
                               func.lower(file_name)
864
            file_extension_filter = func.lower(Content.file_extension) == \
865
                                    func.lower(file_extension)
866
867
        # TODO - G.M - 2018-09-04 - If this method is needed, it should be
868
        # rewritten in order to avoid content_type hardcoded code there
869
        return query.filter(or_(
870
            and_(
871
                Content.type == content_type_list.File.slug,
872
                file_name_filter,
873
                file_extension_filter,
874
            ),
875
            and_(
876
                Content.type == content_type_list.Thread.slug,
877
                file_name_filter,
878
                file_extension_filter,
879
            ),
880
            and_(
881
                Content.type == content_type_list.Page.slug,
882
                file_name_filter,
883
                file_extension_filter,
884
            ),
885
            and_(
886
                Content.type == content_type_list.Folder.slug,
887
                label_filter,
888
            ),
889
        ))
890
891
    def get_pdf_preview_path(
892
            self,
893
            content_id: int,
894
            revision_id: int,
895
            page_number: int
896
    ) -> str:
897
        """
898
        Get pdf preview of revision of content
899
        :param content_id: id of content
900
        :param revision_id: id of content revision
901
        :param page_number: page number of the preview, useful for multipage content
902
        :return: preview_path as string
903
        """
904
        file_path = self.get_one_revision_filepath(revision_id)
905
        try:
906
            page_number = preview_manager_page_format(page_number)
907
            if page_number >= self.preview_manager.get_page_nb(file_path):
908
                raise PageOfPreviewNotFound(
909
                    'page_number {page_number} of content {content_id} does not exist'.format(
910
                        page_number=page_number,
911
                        content_id=content_id
912
                    ),
913
                )
914
            jpg_preview_path = self.preview_manager.get_pdf_preview(
915
                file_path,
916
                page=page_number
917
            )
918
        except UnsupportedMimeType as exc:
919
            raise UnavailablePreview(
920
                'No preview available for content {}, revision {}'.format(content_id, revision_id) # nopep8
921
            ) from exc
922
        return jpg_preview_path
923
924
    def get_full_pdf_preview_path(self, revision_id: int) -> str:
925
        """
926
        Get full(multiple page) pdf preview of revision of content
927
        :param revision_id: id of revision
928
        :return: path of the full pdf preview of this revision
929
        """
930
        file_path = self.get_one_revision_filepath(revision_id)
931
        try:
932
            pdf_preview_path = self.preview_manager.get_pdf_preview(file_path)
933
        except UnsupportedMimeType as exc:
934
            raise UnavailablePreview(
935
                'No preview available for revision {}'.format(revision_id)
936
            ) from exc
937
        return pdf_preview_path
938
939
    def get_jpg_preview_allowed_dim(self) -> PreviewAllowedDim:
940
        """
941
        Get jpg preview allowed dimensions and strict bool param.
942
        """
943
        return PreviewAllowedDim(
944
            self._config.PREVIEW_JPG_RESTRICTED_DIMS,
945
            self._config.PREVIEW_JPG_ALLOWED_DIMS,
946
        )
947
948
    def get_jpg_preview_path(
949
        self,
950
        content_id: int,
951
        revision_id: int,
952
        page_number: int,
953
        width: int = None,
954
        height: int = None,
955
    ) -> str:
956
        """
957
        Get jpg preview of revision of content
958
        :param content_id: id of content
959
        :param revision_id: id of content revision
960
        :param page_number: page number of the preview, useful for multipage content
961
        :param width: width in pixel
962
        :param height: height in pixel
963
        :return: preview_path as string
964
        """
965
        file_path = self.get_one_revision_filepath(revision_id)
966
        try:
967
            page_number = preview_manager_page_format(page_number)
968
            if page_number >= self.preview_manager.get_page_nb(file_path):
969
                raise PageOfPreviewNotFound(
970
                    'page {page_number} of revision {revision_id} of content {content_id} does not exist'.format(  # nopep8
971
                        page_number=page_number,
972
                        revision_id=revision_id,
973
                        content_id=content_id,
974
                    ),
975
                )
976
            if not width and not height:
977
                width = self._config.PREVIEW_JPG_ALLOWED_DIMS[0].width
978
                height = self._config.PREVIEW_JPG_ALLOWED_DIMS[0].height
979
980
            allowed_dim = False
981
            for preview_dim in self._config.PREVIEW_JPG_ALLOWED_DIMS:
982
                if width == preview_dim.width and height == preview_dim.height:
983
                    allowed_dim = True
984
                    break
985
986
            if not allowed_dim and self._config.PREVIEW_JPG_RESTRICTED_DIMS:
987
                raise PreviewDimNotAllowed(
988
                    'Size {width}x{height} is not allowed for jpeg preview'.format(
989
                        width=width,
990
                        height=height,
991
                    )
992
                )
993
            jpg_preview_path = self.preview_manager.get_jpeg_preview(
994
                file_path,
995
                page=page_number,
996
                width=width,
997
                height=height,
998
            )
999
        except UnsupportedMimeType as exc:
1000
            raise UnavailablePreview(
1001
                'No preview available for content {}, revision {}'.format(content_id, revision_id)  # nopep8
1002
            ) from exc
1003
        return jpg_preview_path
1004
1005
    def _get_all_query(
1006
        self,
1007
        parent_id: int = None,
1008
        content_type_slug: str = content_type_list.Any_SLUG,
1009
        workspace: Workspace = None,
1010
        label:str = None,
1011
        order_by_properties: typing.Optional[typing.List[typing.Union[str, QueryableAttribute]]] = None,  # nopep8
1012
    ) -> Query:
1013
        """
1014
        Extended filter for better "get all data" query
1015
        :param parent_id: filter by parent_id
1016
        :param content_type_slug: filter by content_type slug
1017
        :param workspace: filter by workspace
1018
        :param order_by_properties: filter by properties can be both string of
1019
        attribute or attribute of Model object from sqlalchemy(preferred way,
1020
        QueryableAttribute object)
1021
        :return: Query object
1022
        """
1023
        order_by_properties = order_by_properties or []  # FDV
1024
        assert parent_id is None or isinstance(parent_id, int)
1025
        assert content_type_slug is not None
1026
        resultset = self._base_query(workspace)
1027
1028
        if content_type_slug != content_type_list.Any_SLUG:
1029
            # INFO - G.M - 2018-07-05 - convert with
1030
            #  content type object to support legacy slug
1031
            content_type_object = content_type_list.get_one_by_slug(content_type_slug)
1032
            all_slug_alias = [content_type_object.slug]
1033
            if content_type_object.slug_alias:
1034
                all_slug_alias.extend(content_type_object.slug_alias)
1035
            resultset = resultset.filter(Content.type.in_(all_slug_alias))
1036
1037
        if parent_id:
1038
            resultset = resultset.filter(Content.parent_id==parent_id)
1039
        if parent_id == 0 or parent_id is False:
1040
            resultset = resultset.filter(Content.parent_id == None)
1041
        if label:
1042
            resultset = resultset.filter(Content.label.ilike('%{}%'.format(label)))  # nopep8
1043
1044
        for _property in order_by_properties:
1045
            resultset = resultset.order_by(_property)
1046
1047
        return resultset
1048
1049
    def get_all(
1050
            self,
1051
            parent_id: int=None,
1052
            content_type: str=content_type_list.Any_SLUG,
1053
            workspace: Workspace=None,
1054
            label: str=None,
1055
            order_by_properties: typing.Optional[typing.List[typing.Union[str, QueryableAttribute]]] = None,  # nopep8
1056
    ) -> typing.List[Content]:
1057
        """
1058
        Return all content using some filters
1059
        :param parent_id: filter by parent_id
1060
        :param content_type: filter by content_type slug
1061
        :param workspace: filter by workspace
1062
        :param order_by_properties: filter by properties can be both string of
1063
        attribute or attribute of Model object from sqlalchemy(preferred way,
1064
        QueryableAttribute object)
1065
        :return: List of contents
1066
        """
1067
        order_by_properties = order_by_properties or []  # FDV
1068
        return self._get_all_query(parent_id, content_type, workspace, label, order_by_properties).all()
1069
1070
    # TODO - G.M - 2018-07-17 - [Cleanup] Drop this method if unneeded
1071
    # def get_children(self, parent_id: int, content_types: list, workspace: Workspace=None) -> typing.List[Content]:
1072
    #     """
1073
    #     Return parent_id childs of given content_types
1074
    #     :param parent_id: parent id
1075
    #     :param content_types: list of types
1076
    #     :param workspace: workspace filter
1077
    #     :return: list of content
1078
    #     """
1079
    #     resultset = self._base_query(workspace)
1080
    #     resultset = resultset.filter(Content.type.in_(content_types))
1081
    #
1082
    #     if parent_id:
1083
    #         resultset = resultset.filter(Content.parent_id==parent_id)
1084
    #     if parent_id is False:
1085
    #         resultset = resultset.filter(Content.parent_id == None)
1086
    #
1087
    #     return resultset.all()
1088
1089
    # TODO - G.M - 2018-07-17 - [Cleanup] Drop this method if unneeded
1090
    # TODO find an other name to filter on is_deleted / is_archived
1091
    def get_all_with_filter(self, parent_id: int=None, content_type: str=content_type_list.Any_SLUG, workspace: Workspace=None) -> typing.List[Content]:
1092
        assert parent_id is None or isinstance(parent_id, int) # DYN_REMOVE
1093
        assert content_type is not None# DYN_REMOVE
1094
        assert isinstance(content_type, str) # DYN_REMOVE
1095
1096
        resultset = self._base_query(workspace)
1097
1098
        if content_type != content_type_list.Any_SLUG:
1099
            resultset = resultset.filter(Content.type==content_type)
1100
1101
        resultset = resultset.filter(Content.is_deleted == self._show_deleted)
1102
        resultset = resultset.filter(Content.is_archived == self._show_archived)
1103
        resultset = resultset.filter(Content.is_temporary == self._show_temporary)
1104
1105
        resultset = resultset.filter(Content.parent_id==parent_id)
1106
1107
        return resultset.all()
1108
1109
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1110
    def get_all_without_exception(self, content_type: str, workspace: Workspace=None) -> typing.List[Content]:
1111
        assert content_type is not None# DYN_REMOVE
1112
1113
        resultset = self._base_query(workspace)
1114
1115
        if content_type != content_type_list.Any_SLUG:
1116
            resultset = resultset.filter(Content.type==content_type)
1117
1118
        return resultset.all()
1119
1120
    def get_last_unread(self, parent_id: int, content_type: str,
1121
                        workspace: Workspace=None, limit=10) -> typing.List[Content]:
1122
        assert parent_id is None or isinstance(parent_id, int) # DYN_REMOVE
1123
        assert content_type is not None# DYN_REMOVE
1124
        assert isinstance(content_type, str) # DYN_REMOVE
1125
1126
        read_revision_ids = self._session.query(RevisionReadStatus.revision_id) \
1127
            .filter(RevisionReadStatus.user_id==self._user_id)
1128
1129
        not_read_revisions = self._revisions_base_query(workspace) \
1130
            .filter(~ContentRevisionRO.revision_id.in_(read_revision_ids)) \
1131
            .filter(ContentRevisionRO.workspace_id == Workspace.workspace_id) \
1132
            .filter(Workspace.is_deleted.is_(False)) \
1133
            .subquery()
1134
1135
    # TODO - G.M - 2018-07-17 - [Cleanup] Drop this method if unneeded
1136
    # def get_all_without_exception(self, content_type: str, workspace: Workspace=None) -> typing.List[Content]:
1137
    #     assert content_type is not None# DYN_REMOVE
1138
    #
1139
    #     resultset = self._base_query(workspace)
1140
    #
1141
    #     if content_type != content_type_list.Any_SLUG:
1142
    #         resultset = resultset.filter(Content.type==content_type)
1143
    #
1144
    #     return resultset.all()
1145
1146
    def get_last_active(
1147
            self,
1148
            workspace: Workspace=None,
1149
            limit: typing.Optional[int]=None,
1150
            before_content: typing.Optional[Content]= None,
1151
            content_ids: typing.Optional[typing.List[int]] = None,
1152
    ) -> typing.List[Content]:
1153
        """
1154
        get contents list sorted by last update
1155
        (last modification of content itself or one of this comment)
1156
        :param workspace: Workspace to check
1157
        :param limit: maximum number of elements to return
1158
        :param before_content: last_active content are only those updated
1159
         before this content given.
1160
        :param content_ids: restrict selection to some content ids and
1161
        related Comments
1162
        :return: list of content
1163
        """
1164
1165
        resultset = self._get_all_query(
1166
            workspace=workspace,
1167
        )
1168
        if content_ids:
1169
            resultset = resultset.filter(
1170
                or_(
1171
                    Content.content_id.in_(content_ids),
1172
                    and_(
1173
                        Content.parent_id.in_(content_ids),
1174
                        Content.type == content_type_list.Comment.slug
1175
                    )
1176
                )
1177
            )
1178
1179
        resultset = resultset.order_by(desc(Content.updated))
1180
1181
        active_contents = []
1182
        too_recent_content = []
1183
        before_content_find = False
1184
        for content in resultset:
1185
            related_active_content = None
1186
            if content_type_list.Comment.slug == content.type:
1187
                related_active_content = content.parent
1188
            else:
1189
                related_active_content = content
1190
1191
            # INFO - G.M - 2018-08-10 - re-apply general filters here to avoid
1192
            # issue with comments
1193
            if not self._show_deleted and related_active_content.is_deleted:
1194
                continue
1195
            if not self._show_archived and related_active_content.is_archived:
1196
                continue
1197
1198
            if related_active_content not in active_contents and related_active_content not in too_recent_content:  # nopep8
1199
1200
                if not before_content or before_content_find:
1201
                    active_contents.append(related_active_content)
1202
                else:
1203
                    too_recent_content.append(related_active_content)
1204
1205
                if before_content and related_active_content == before_content:
1206
                    before_content_find = True
1207
1208
            if limit and len(active_contents) >= limit:
1209
                break
1210
1211
        return active_contents
1212
1213
    # TODO - G.M - 2018-07-19 - Find a way to update this method to something
1214
    # usable and efficient for tracim v2 to get content with read/unread status
1215
    # instead of relying on has_new_information_for()
1216
    # def get_last_unread(self, parent_id: typing.Optional[int], content_type: str,
1217
    #                     workspace: Workspace=None, limit=10) -> typing.List[Content]:
1218
    #     assert parent_id is None or isinstance(parent_id, int) # DYN_REMOVE
1219
    #     assert content_type is not None# DYN_REMOVE
1220
    #     assert isinstance(content_type, str) # DYN_REMOVE
1221
    #
1222
    #     read_revision_ids = self._session.query(RevisionReadStatus.revision_id) \
1223
    #         .filter(RevisionReadStatus.user_id==self._user_id)
1224
    #
1225
    #     not_read_revisions = self._revisions_base_query(workspace) \
1226
    #         .filter(~ContentRevisionRO.revision_id.in_(read_revision_ids)) \
1227
    #         .filter(ContentRevisionRO.workspace_id == Workspace.workspace_id) \
1228
    #         .filter(Workspace.is_deleted.is_(False)) \
1229
    #         .subquery()
1230
    #
1231
    #     not_read_content_ids_query = self._session.query(
1232
    #         distinct(not_read_revisions.c.content_id)
1233
    #     )
1234
    #     not_read_content_ids = list(map(
1235
    #         itemgetter(0),
1236
    #         not_read_content_ids_query,
1237
    #     ))
1238
    #
1239
    #     not_read_contents = self._base_query(workspace) \
1240
    #         .filter(Content.content_id.in_(not_read_content_ids)) \
1241
    #         .order_by(desc(Content.updated))
1242
    #
1243
    #     if content_type != content_type_list.Any_SLUG:
1244
    #         not_read_contents = not_read_contents.filter(
1245
    #             Content.type==content_type)
1246
    #     else:
1247
    #         not_read_contents = not_read_contents.filter(
1248
    #             Content.type!=content_type_list.Folder.slug)
1249
    #
1250
    #     if parent_id:
1251
    #         not_read_contents = not_read_contents.filter(
1252
    #             Content.parent_id==parent_id)
1253
    #
1254
    #     result = []
1255
    #     for item in not_read_contents:
1256
    #         new_item = None
1257
    #         if content_type_list.Comment.slug == item.type:
1258
    #             new_item = item.parent
1259
    #         else:
1260
    #             new_item = item
1261
    #
1262
    #         # INFO - D.A. - 2015-05-20
1263
    #         # We do not want to show only one item if the last 10 items are
1264
    #         # comments about one thread for example
1265
    #         if new_item not in result:
1266
    #             result.append(new_item)
1267
    #
1268
    #         if len(result) >= limit:
1269
    #             break
1270
    #
1271
    #     return result
1272
1273
    def _set_allowed_content(self, content: Content, allowed_content_dict: dict) -> None:  # nopep8
1274
        """
1275
        :param content: the given content instance
1276
        :param allowed_content_dict: must be something like this:
1277
            dict(
1278
                folder = True
1279
                thread = True,
1280
                file = False,
1281
                page = True
1282
            )
1283
        :return: nothing
1284
        """
1285
        properties = content.properties.copy()
1286
        properties['allowed_content'] = allowed_content_dict
1287
        content.properties = properties
1288
1289
    def set_allowed_content(self, content: Content, allowed_content_type_slug_list: typing.List[str]) -> None:  # nopep8
1290
        """
1291
        :param content: the given content instance
1292
        :param allowed_content_type_slug_list: list of content_type_slug to
1293
        accept as subcontent.
1294
        :return: nothing
1295
        """
1296
        allowed_content_dict = {}
1297
        for allowed_content_type_slug in allowed_content_type_slug_list:
1298
            if allowed_content_type_slug not in content_type_list.endpoint_allowed_types_slug():
1299
                raise ContentTypeNotExist('Content_type {} does not exist'.format(allowed_content_type_slug))  # nopep8
1300
            allowed_content_dict[allowed_content_type_slug] = True
1301
1302
        self._set_allowed_content(content, allowed_content_dict)
1303
1304
    def restore_content_default_allowed_content(self, content: Content) -> None:
1305
        """
1306
        Return to default allowed_content_types
1307
        :param content: the given content instance
1308
        :return: nothing
1309
        """
1310
        if content._properties and 'allowed_content' in content._properties:
1311
            properties = content.properties.copy()
1312
            del properties['allowed_content']
1313
            content.properties = properties
1314
1315
    def set_status(self, content: Content, new_status: str):
1316
        if new_status in content_status_list.get_all_slugs_values():
1317
            content.status = new_status
1318
            content.revision_type = ActionDescription.STATUS_UPDATE
1319
        else:
1320
            raise ValueError('The given value {} is not allowed'.format(new_status))
1321
1322
    def move(self,
1323
             item: Content,
1324
             new_parent: Content,
1325
             must_stay_in_same_workspace: bool=True,
1326
             new_workspace: Workspace=None,
1327
    ):
1328
        if must_stay_in_same_workspace:
1329
            if new_parent and new_parent.workspace_id != item.workspace_id:
1330
                raise ValueError('the item should stay in the same workspace')
1331
1332
        item.parent = new_parent
1333
        if new_workspace:
1334
            item.workspace = new_workspace
1335
            if new_parent and \
1336
                    new_parent.workspace_id != new_workspace.workspace_id:
1337
                raise WorkspacesDoNotMatch(
1338
                    'new parent workspace and new workspace should be the same.'
1339
                )
1340
        else:
1341
            if new_parent:
1342
                item.workspace = new_parent.workspace
1343
1344
        self._is_content_label_free_or_raise(
1345
            item.label,
1346
            item.workspace,
1347
            item.parent,
1348
            exclude_content_id=item.content_id
1349
        )
1350
        item.revision_type = ActionDescription.MOVE
1351
1352
    def copy(
1353
        self,
1354
        item: Content,
1355
        new_parent: Content=None,
1356
        new_label: str=None,
1357
        do_save: bool=True,
1358
        do_notify: bool=True,
1359
    ) -> Content:
1360
        """
1361
        Copy nearly all content, revision included. Children not included, see
1362
        "copy_children" for this.
1363
        :param item: Item to copy
1364
        :param new_parent: new parent of the new copied item
1365
        :param new_label: new label of the new copied item
1366
        :param do_notify: notify copy or not
1367
        :return: Newly copied item
1368
        """
1369
        if (not new_parent and not new_label) or (new_parent == item.parent and new_label == item.label):  # nopep8
1370
            # TODO - G.M - 08-03-2018 - Use something else than value error
1371
            raise ValueError("You can't copy file into itself")
1372
        if new_parent:
1373
            workspace = new_parent.workspace
1374
            parent = new_parent
1375
        else:
1376
            workspace = item.workspace
1377
            parent = item.parent
1378
        label = new_label or item.label
1379
1380
        self._is_content_label_free_or_raise(label, workspace, parent)
1381
        content = item.copy(parent)
1382
        # INFO - GM - 15-03-2018 - add "copy" revision
1383
        with new_revision(
1384
            session=self._session,
1385
            tm=transaction.manager,
1386
            content=content,
1387
            force_create_new_revision=True
1388
        ) as rev:
1389
            rev.parent = parent
1390
            rev.workspace = workspace
1391
            rev.label = label
1392
            rev.revision_type = ActionDescription.COPY
1393
            rev.properties['origin'] = {
1394
                'content': item.id,
1395
                'revision': item.last_revision.revision_id,
1396
            }
1397
        if do_save:
1398
            self.save(content, ActionDescription.COPY, do_notify=do_notify)
1399
        return content
1400
1401
    def copy_children(self, origin_content: Content, new_content: Content):
1402
        for child in origin_content.children:
1403
            self.copy(child, new_content)
1404
1405
    def move_recursively(self, item: Content,
1406
                         new_parent: Content, new_workspace: Workspace):
1407
        self.move(item, new_parent, False, new_workspace)
1408
        self.save(item, do_notify=False)
1409
1410
        for child in item.children:
1411
            with new_revision(
1412
                session=self._session,
1413
                tm=transaction.manager,
1414
                content=child
1415
            ):
1416
                self.move_recursively(child, item, new_workspace)
1417
        return
1418
1419
    def is_editable(self, item: Content) -> bool:
1420
        return not item.is_readonly \
1421
               and item.is_active \
1422
               and item.get_status().is_editable()
1423
1424
    def update_content(self, item: Content, new_label: str, new_content: str=None) -> Content:
1425
        if not self.is_editable(item):
1426
            raise ContentInNotEditableState("Can't update not editable file, you need to change his status or state (deleted/archived) before any change.")  # nopep8
1427
        if item.label == new_label and item.description == new_content:
1428
            # TODO - G.M - 20-03-2018 - Fix internatization for webdav access.
1429
            # Internatization disabled in libcontent for now.
1430
            raise SameValueError('The content did not changed')
1431
        if not new_label:
1432
            raise EmptyLabelNotAllowed()
1433
1434
        self._is_content_label_free_or_raise(
1435
            new_label,
1436
            item.workspace,
1437
            item.parent,
1438
            exclude_content_id=item.content_id
1439
        )
1440
1441
        item.owner = self._user
1442
        item.label = new_label
1443
        item.description = new_content if new_content else item.description # TODO: convert urls into links
1444
        item.revision_type = ActionDescription.EDITION
1445
        return item
1446
1447
    def update_file_data(self, item: Content, new_filename: str, new_mimetype: str, new_content: bytes) -> Content:
1448
        if not self.is_editable(item):
1449
            raise ContentInNotEditableState("Can't update not editable file, you need to change his status or state (deleted/archived) before any change.")  # nopep8
1450
        if new_mimetype == item.file_mimetype and \
1451
                new_content == item.depot_file.file.read():
1452
            raise SameValueError('The content did not changed')
1453
        item.owner = self._user
1454
        item.file_name = new_filename
1455
        item.file_mimetype = new_mimetype
1456
        item.depot_file = FileIntent(
1457
            new_content,
1458
            new_filename,
1459
            new_mimetype,
1460
        )
1461
        item.revision_type = ActionDescription.REVISION
1462
        return item
1463
1464
    def archive(self, content: Content):
1465
        content.owner = self._user
1466
        content.is_archived = True
1467
        # TODO - G.M - 12-03-2018 - Inspect possible label conflict problem
1468
        # INFO - G.M - 12-03-2018 - Set label name to avoid trouble when
1469
        # un-archiving file.
1470
        content.label = '{label}-{action}-{date}'.format(
1471
            label=content.label,
1472
            action='archived',
1473
            date=current_date_for_filename()
1474
        )
1475
        content.revision_type = ActionDescription.ARCHIVING
1476
1477
    def unarchive(self, content: Content):
1478
        content.owner = self._user
1479
        content.is_archived = False
1480
        content.revision_type = ActionDescription.UNARCHIVING
1481
1482
    def delete(self, content: Content):
1483
        content.owner = self._user
1484
        content.is_deleted = True
1485
        # TODO - G.M - 12-03-2018 - Inspect possible label conflict problem
1486
        # INFO - G.M - 12-03-2018 - Set label name to avoid trouble when
1487
        # un-deleting file.
1488
        content.label = '{label}-{action}-{date}'.format(
1489
            label=content.label,
1490
            action='deleted',
1491
            date=current_date_for_filename()
1492
        )
1493
        content.revision_type = ActionDescription.DELETION
1494
1495
    def undelete(self, content: Content):
1496
        content.owner = self._user
1497
        content.is_deleted = False
1498
        content.revision_type = ActionDescription.UNDELETION
1499
1500
    def get_preview_page_nb(self, revision_id: int) -> typing.Optional[int]:
1501
        file_path = self.get_one_revision_filepath(revision_id)
1502
        try:
1503
            nb_pages = self.preview_manager.get_page_nb(file_path)
1504
        except UnsupportedMimeType:
1505
            return None
1506
        return nb_pages
1507
1508
    def has_pdf_preview(self, revision_id: int) -> bool:
1509
        file_path = self.get_one_revision_filepath(revision_id)
1510
        try:
1511
            has_pdf_preview = self.preview_manager.has_pdf_preview(file_path)
1512
        except UnsupportedMimeType:
1513
            has_pdf_preview = False
1514
        return has_pdf_preview
1515
1516
    def mark_read__all(
1517
            self,
1518
            read_datetime: datetime=None,
1519
            do_flush: bool=True,
1520
            recursive: bool=True
1521
    ) -> None:
1522
        """
1523
        Read content of all workspace visible for the user.
1524
        :param read_datetime: date of readigin
1525
        :param do_flush: flush database
1526
        :param recursive: mark read subcontent too
1527
        :return: nothing
1528
        """
1529
        return self.mark_read__workspace(None, read_datetime, do_flush, recursive) # nopep8
1530
1531
    def mark_read__workspace(self,
1532
                       workspace : Workspace,
1533
                       read_datetime: datetime=None,
1534
                       do_flush: bool=True,
1535
                       recursive: bool=True
1536
    ) -> None:
1537
        """
1538
        Read content of a workspace visible for the user.
1539
        :param read_datetime: date of readigin
1540
        :param do_flush: flush database
1541
        :param recursive: mark read subcontent too
1542
        :return: nothing
1543
        """
1544
        itemset = self.get_last_active(workspace)
1545
        for item in itemset:
1546
            if item.has_new_information_for(self._user):
1547
                self.mark_read(item, read_datetime, do_flush, recursive)
1548
1549
    def mark_read(
1550
            self,
1551
            content: Content,
1552
            read_datetime: datetime=None,
1553
            do_flush: bool=True,
1554
            recursive: bool=True
1555
    ) -> Content:
1556
        """
1557
        Read content for the user.
1558
        :param read_datetime: date of readigin
1559
        :param do_flush: flush database
1560
        :param recursive: mark read subcontent too
1561
        :return: nothing
1562
        """
1563
        assert self._user
1564
        assert content
1565
1566
        # The algorithm is:
1567
        # 1. define the read datetime
1568
        # 2. update all revisions related to current Content
1569
        # 3. do the same for all child revisions
1570
        #    (ie parent_id is content_id of current content)
1571
1572
        if not read_datetime:
1573
            read_datetime = datetime.datetime.now()
1574
1575
        viewed_revisions = self._session.query(ContentRevisionRO) \
1576
            .filter(ContentRevisionRO.content_id==content.content_id).all()
1577
1578
        for revision in viewed_revisions:
1579
            revision.read_by[self._user] = read_datetime
1580
1581
        if recursive:
1582
            # mark read :
1583
            # - all children
1584
            # - parent stuff (if you mark a comment as read,
1585
            #                 then you have seen the parent)
1586
            # - parent comments
1587
            for child in content.get_valid_children():
1588
                self.mark_read(child, read_datetime=read_datetime,
1589
                               do_flush=False)
1590
1591
            if content_type_list.Comment.slug == content.type:
1592
                self.mark_read(content.parent, read_datetime=read_datetime,
1593
                               do_flush=False, recursive=False)
1594
                for comment in content.parent.get_comments():
1595
                    if comment != content:
1596
                        self.mark_read(comment, read_datetime=read_datetime,
1597
                                       do_flush=False, recursive=False)
1598
1599
        if do_flush:
1600
            self.flush()
1601
1602
        return content
1603
1604
    def mark_unread(self, content: Content, do_flush=True) -> Content:
1605
        assert self._user
1606
        assert content
1607
1608
        revisions = self._session.query(ContentRevisionRO) \
1609
            .filter(ContentRevisionRO.content_id==content.content_id).all()
1610
1611
        for revision in revisions:
1612
            try:
1613
                del revision.read_by[self._user]
1614
            except KeyError:
1615
                pass
1616
1617
        for child in content.get_valid_children():
1618
            self.mark_unread(child, do_flush=False)
1619
1620
        if do_flush:
1621
            self.flush()
1622
1623
        return content
1624
1625
    def flush(self):
1626
        self._session.flush()
1627
1628
    def save(self, content: Content, action_description: str=None, do_flush=True, do_notify=True):
1629
        """
1630
        Save an object, flush the session and set the revision_type property
1631
        :param content:
1632
        :param action_description:
1633
        :return:
1634
        """
1635
        assert action_description is None or action_description in ActionDescription.allowed_values()
1636
1637
        if not action_description:
1638
            # See if the last action has been modified
1639
            if content.revision_type==None or len(get_history(content.revision, 'revision_type'))<=0:
1640
                # The action has not been modified, so we set it to default edition
1641
                action_description = ActionDescription.EDITION
1642
1643
        if action_description:
1644
            content.revision_type = action_description
1645
1646
        if do_flush:
1647
            # INFO - 2015-09-03 - D.A.
1648
            # There are 2 flush because of the use
1649
            # of triggers for content creation
1650
            #
1651
            # (when creating a content, actually this is an insert of a new
1652
            # revision in content_revisions ; so the mark_read operation need
1653
            # to get full real data from database before to be prepared.
1654
1655
            self._session.add(content)
1656
            self._session.flush()
1657
1658
            # TODO - 2015-09-03 - D.A. - Do not use triggers
1659
            # We should create a new ContentRevisionRO object instead of Content
1660
            # This would help managing view/not viewed status
1661
            self.mark_read(content, do_flush=True)
1662
1663
        if do_notify:
1664
            self.do_notify(content)
1665
1666
    def do_notify(self, content: Content):
1667
        """
1668
        Allow to force notification for a given content. By default, it is
1669
        called during the .save() operation
1670
        :param content:
1671
        :return:
1672
        """
1673
        NotifierFactory.create(
1674
            config=self._config,
1675
            current_user=self._user,
1676
            session=self._session,
1677
        ).notify_content_update(content)
1678
1679
    def get_keywords(self, search_string, search_string_separators=None) -> [str]:
1680
        """
1681
        :param search_string: a list of coma-separated keywords
1682
        :return: a list of str (each keyword = 1 entry
1683
        """
1684
1685
        search_string_separators = search_string_separators or ContentApi.SEARCH_SEPARATORS
1686
1687
        keywords = []
1688
        if search_string:
1689
            keywords = [keyword.strip() for keyword in re.split(search_string_separators, search_string)]
1690
1691
        return keywords
1692
1693
    def search(self, keywords: [str]) -> Query:
1694
        """
1695
        :return: a sorted list of Content items
1696
        """
1697
1698
        if len(keywords)<=0:
1699
            return None
1700
1701
        filter_group_label = list(Content.label.ilike('%{}%'.format(keyword)) for keyword in keywords)
1702
        filter_group_desc = list(Content.description.ilike('%{}%'.format(keyword)) for keyword in keywords)
1703
        title_keyworded_items = self._hard_filtered_base_query().\
1704
            filter(or_(*(filter_group_label+filter_group_desc))).\
1705
            options(joinedload('children_revisions')).\
1706
            options(joinedload('parent'))
1707
1708
        return title_keyworded_items
1709
1710
    def get_all_types(self) -> typing.List[ContentType]:
1711
        labels = content_type_list.endpoint_allowed_types_slug()
1712
        content_types = []
1713
        for label in labels:
1714
            content_types.append(content_type_list.get_one_by_slug(label))
1715
1716
        return content_types
1717
1718
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1719
    def exclude_unavailable(
1720
        self,
1721
        contents: typing.List[Content],
1722
    ) -> typing.List[Content]:
1723
        """
1724
        Update and return list with content under archived/deleted removed.
1725
        :param contents: List of contents to parse
1726
        """
1727
        for content in contents[:]:
1728
            if self.content_under_deleted(content) or self.content_under_archived(content):
1729
                contents.remove(content)
1730
        return contents
1731
1732
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1733
    def content_under_deleted(self, content: Content) -> bool:
1734
        if content.parent:
1735
            if content.parent.is_deleted:
1736
                return True
1737
            if content.parent.parent:
1738
                return self.content_under_deleted(content.parent)
1739
        return False
1740
1741
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1742
    def content_under_archived(self, content: Content) -> bool:
1743
        if content.parent:
1744
            if content.parent.is_archived:
1745
                return True
1746
            if content.parent.parent:
1747
                return self.content_under_archived(content.parent)
1748
        return False
1749
1750
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1751
    def find_one_by_unique_property(
1752
            self,
1753
            property_name: str,
1754
            property_value: str,
1755
            workspace: Workspace=None,
1756
    ) -> Content:
1757
        """
1758
        Return Content who contains given property.
1759
        Raise sqlalchemy.orm.exc.MultipleResultsFound if more than one Content
1760
        contains this property value.
1761
        :param property_name: Name of property
1762
        :param property_value: Value of property
1763
        :param workspace: Workspace who contains Content
1764
        :return: Found Content
1765
        """
1766
        # TODO - 20160602 - Bastien: Should be JSON type query
1767
        # see https://www.compose.io/articles/using-json-extensions-in-\
1768
        # postgresql-from-python-2/
1769
        query = self._base_query(workspace=workspace).filter(
1770
            Content._properties.like(
1771
                '%"{property_name}": "{property_value}"%'.format(
1772
                    property_name=property_name,
1773
                    property_value=property_value,
1774
                )
1775
            )
1776
        )
1777
        return query.one()
1778
1779
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1780
    def generate_folder_label(
1781
            self,
1782
            workspace: Workspace,
1783
            parent: Content=None,
1784
    ) -> str:
1785
        """
1786
        Generate a folder label
1787
        :param workspace: Future folder workspace
1788
        :param parent: Parent of foture folder (can be None)
1789
        :return: Generated folder name
1790
        """
1791
        _ = self.translator.get_translation
1792
        query = self._base_query(workspace=workspace)\
1793
            .filter(Content.label.ilike('{0}%'.format(
1794
                _('New folder'),
1795
            )))
1796
        if parent:
1797
            query = query.filter(Content.parent == parent)
1798
1799
        return _('New folder {0}').format(
1800
            query.count() + 1,
1801
        )
1802