Completed
Pull Request — develop (#31)
by inkhey
20:37 queued 14:44
created

ContentApi.set_status()   A

Complexity

Conditions 2

Size

Total Lines 6
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 6
rs 10
c 0
b 0
f 0
cc 2
nop 3
1
# -*- coding: utf-8 -*-
2
from contextlib import contextmanager
3
import os
4
import datetime
5
import re
6
import typing
7
from operator import itemgetter
8
9
import transaction
10
from preview_generator.manager import PreviewManager
11
from sqlalchemy import func
12
from sqlalchemy.orm import Query
13
from depot.manager import DepotManager
14
from depot.io.utils import FileIntent
15
import sqlalchemy
16
from sqlalchemy.orm import aliased
17
from sqlalchemy.orm import joinedload
18
from sqlalchemy.orm.attributes import get_history
19
from sqlalchemy.orm.exc import NoResultFound
20
from sqlalchemy.orm.session import Session
21
from sqlalchemy import desc
22
from sqlalchemy import distinct
23
from sqlalchemy import or_
24
from sqlalchemy.sql.elements import and_
25
26
from tracim_backend.lib.utils.utils import cmp_to_key
27
from tracim_backend.lib.core.notifications import NotifierFactory
28
from tracim_backend.exceptions import SameValueError
29
from tracim_backend.exceptions import UnallowedSubContent
30
from tracim_backend.exceptions import ContentTypeNotExist
31
from tracim_backend.exceptions import PageOfPreviewNotFound
32
from tracim_backend.exceptions import PreviewDimNotAllowed
33
from tracim_backend.exceptions import RevisionDoesNotMatchThisContent
34
from tracim_backend.exceptions import EmptyCommentContentNotAllowed
35
from tracim_backend.exceptions import EmptyLabelNotAllowed
36
from tracim_backend.exceptions import ContentNotFound
37
from tracim_backend.exceptions import WorkspacesDoNotMatch
38
from tracim_backend.lib.utils.utils import current_date_for_filename
39
from tracim_backend.app_models.contents import CONTENT_STATUS
40
from tracim_backend.app_models.contents import ContentType
41
from tracim_backend.app_models.contents import CONTENT_TYPES
42
from tracim_backend.models.revision_protection import new_revision
43
from tracim_backend.models.auth import User
44
from tracim_backend.models.data import ActionDescription
45
from tracim_backend.models.data import ContentRevisionRO
46
from tracim_backend.models.data import Content
47
48
from tracim_backend.models.data import NodeTreeItem
49
from tracim_backend.models.data import RevisionReadStatus
50
from tracim_backend.models.data import UserRoleInWorkspace
51
from tracim_backend.models.data import Workspace
52
from tracim_backend.lib.utils.translation import fake_translator as _
53
from tracim_backend.models.context_models import RevisionInContext
54
from tracim_backend.models.context_models import PreviewAllowedDim
55
from tracim_backend.models.context_models import ContentInContext
56
57
__author__ = 'damien'
58
59
60
# TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
61
def compare_content_for_sorting_by_type_and_name(
62
        content1: Content,
63
        content2: Content
64
) -> int:
65
    """
66
    :param content1:
67
    :param content2:
68
    :return:    1 if content1 > content2
69
                -1 if content1 < content2
70
                0 if content1 = content2
71
    """
72
73
    if content1.type == content2.type:
74
        if content1.get_label().lower()>content2.get_label().lower():
75
            return 1
76
        elif content1.get_label().lower()<content2.get_label().lower():
77
            return -1
78
        return 0
79
    else:
80
        # TODO - D.A. - 2014-12-02 - Manage Content Types Dynamically
81
        content_type_order = [
82
            CONTENT_TYPES.Folder.slug,
83
            CONTENT_TYPES.Page.slug,
84
            CONTENT_TYPES.Thread.slug,
85
            CONTENT_TYPES.File.slug,
86
        ]
87
88
        content_1_type_index = content_type_order.index(content1.type)
89
        content_2_type_index = content_type_order.index(content2.type)
90
        result = content_1_type_index - content_2_type_index
91
92
        if result < 0:
93
            return -1
94
        elif result > 0:
95
            return 1
96
        else:
97
            return 0
98
99
# TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
100
def compare_tree_items_for_sorting_by_type_and_name(
101
        item1: NodeTreeItem,
102
        item2: NodeTreeItem
103
) -> int:
104
    return compare_content_for_sorting_by_type_and_name(item1.node, item2.node)
105
106
107
class ContentApi(object):
108
109
    SEARCH_SEPARATORS = ',| '
110
    SEARCH_DEFAULT_RESULT_NB = 50
111
112
    # DISPLAYABLE_CONTENTS = (
113
    #     CONTENT_TYPES.Folder.slug,
114
    #     CONTENT_TYPES.File.slug,
115
    #     CONTENT_TYPES.Comment.slug,
116
    #     CONTENT_TYPES.Thread.slug,
117
    #     CONTENT_TYPES.Page.slug,
118
    #     CONTENT_TYPES.Page.slugLegacy,
119
    #     ContentType.MarkdownPage,
120
    # )
121
122
    def __init__(
123
            self,
124
            session: Session,
125
            current_user: typing.Optional[User],
126
            config,
127
            show_archived: bool = False,
128
            show_deleted: bool = False,
129
            show_temporary: bool = False,
130
            show_active: bool = True,
131
            all_content_in_treeview: bool = True,
132
            force_show_all_types: bool = False,
133
            disable_user_workspaces_filter: bool = False,
134
    ) -> None:
135
        self._session = session
136
        self._user = current_user
137
        self._config = config
138
        self._user_id = current_user.user_id if current_user else None
139
        self._show_archived = show_archived
140
        self._show_deleted = show_deleted
141
        self._show_temporary = show_temporary
142
        self._show_active = show_active
143
        self._show_all_type_of_contents_in_treeview = all_content_in_treeview
144
        self._force_show_all_types = force_show_all_types
145
        self._disable_user_workspaces_filter = disable_user_workspaces_filter
146
        self.preview_manager = PreviewManager(self._config.PREVIEW_CACHE_DIR, create_folder=True)  # nopep8
147
148
    @contextmanager
149
    def show(
150
            self,
151
            show_archived: bool=False,
152
            show_deleted: bool=False,
153
            show_temporary: bool=False,
154
    ) -> typing.Generator['ContentApi', None, None]:
155
        """
156
        Use this method as context manager to update show_archived,
157
        show_deleted and show_temporary properties during context.
158
        :param show_archived: show archived contents
159
        :param show_deleted:  show deleted contents
160
        :param show_temporary:  show temporary contents
161
        """
162
        previous_show_archived = self._show_archived
163
        previous_show_deleted = self._show_deleted
164
        previous_show_temporary = self._show_temporary
165
166
        try:
167
            self._show_archived = show_archived
168
            self._show_deleted = show_deleted
169
            self._show_temporary = show_temporary
170
            yield self
171
        finally:
172
            self._show_archived = previous_show_archived
173
            self._show_deleted = previous_show_deleted
174
            self._show_temporary = previous_show_temporary
175
176
    def get_content_in_context(self, content: Content) -> ContentInContext:
177
        return ContentInContext(content, self._session, self._config, self._user)  # nopep8
178
179
    def get_revision_in_context(self, revision: ContentRevisionRO) -> RevisionInContext:  # nopep8
180
        # TODO - G.M - 2018-06-173 - create revision in context object
181
        return RevisionInContext(revision, self._session, self._config)
182
    
183
    def _get_revision_join(self) -> sqlalchemy.sql.elements.BooleanClauseList:
184
        """
185
        Return the Content/ContentRevision query join condition
186
        :return: Content/ContentRevision query join condition
187
        """
188
        return and_(Content.id == ContentRevisionRO.content_id,
189
                    ContentRevisionRO.revision_id == self._session.query(
190
                        ContentRevisionRO.revision_id)
191
                    .filter(ContentRevisionRO.content_id == Content.id)
192
                    .order_by(ContentRevisionRO.revision_id.desc())
193
                    .limit(1)
194
                    .correlate(Content))
195
196
    def get_canonical_query(self) -> Query:
197
        """
198
        Return the Content/ContentRevision base query who join these table on the last revision.
199
        :return: Content/ContentRevision Query
200
        """
201
        return self._session.query(Content)\
202
            .join(ContentRevisionRO, self._get_revision_join())
203
204
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
205
    @classmethod
206
    def sort_tree_items(
207
        cls,
208
        content_list: typing.List[NodeTreeItem],
209
    )-> typing.List[NodeTreeItem]:
210
        news = []
211
        for item in content_list:
212
            news.append(item)
213
214
        content_list.sort(key=cmp_to_key(
215
            compare_tree_items_for_sorting_by_type_and_name,
216
        ))
217
218
        return content_list
219
220
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
221
    @classmethod
222
    def sort_content(
223
        cls,
224
        content_list: typing.List[Content],
225
    ) -> typing.List[Content]:
226
        content_list.sort(key=cmp_to_key(compare_content_for_sorting_by_type_and_name))
227
        return content_list
228
229
    def __real_base_query(
230
        self,
231
        workspace: Workspace = None,
232
    ) -> Query:
233
        result = self.get_canonical_query()
234
235
        # Exclude non displayable types
236
        if not self._force_show_all_types:
237
            result = result.filter(Content.type.in_(CONTENT_TYPES.query_allowed_types_slugs()))
238
239
        if workspace:
240
            result = result.filter(Content.workspace_id == workspace.workspace_id)
241
242
        # Security layer: if user provided, filter
243
        # with user workspaces privileges
244
        if self._user and not self._disable_user_workspaces_filter:
245
            user = self._session.query(User).get(self._user_id)
246
            # Filter according to user workspaces
247
            workspace_ids = [r.workspace_id for r in user.roles \
248
                             if r.role>=UserRoleInWorkspace.READER]
249
            result = result.filter(or_(
250
                Content.workspace_id.in_(workspace_ids),
251
                # And allow access to non workspace document when he is owner
252
                and_(
253
                    Content.workspace_id == None,
254
                    Content.owner_id == self._user_id,
255
                )
256
            ))
257
258
        return result
259
260
    def _base_query(self, workspace: Workspace=None) -> Query:
261
        result = self.__real_base_query(workspace)
262
263
        if not self._show_active:
264
            result = result.filter(or_(
265
                Content.is_deleted==True,
266
                Content.is_archived==True,
267
            ))
268
        if not self._show_deleted:
269
            result = result.filter(Content.is_deleted==False)
270
271
        if not self._show_archived:
272
            result = result.filter(Content.is_archived==False)
273
274
        if not self._show_temporary:
275
            result = result.filter(Content.is_temporary==False)
276
277
        return result
278
279
    def __revisions_real_base_query(
280
        self,
281
        workspace: Workspace=None,
282
    ) -> Query:
283
        result = self._session.query(ContentRevisionRO)
284
285
        # Exclude non displayable types
286
        if not self._force_show_all_types:
287
            result = result.filter(Content.type.in_(self.DISPLAYABLE_CONTENTS))
288
289
        if workspace:
290
            result = result.filter(ContentRevisionRO.workspace_id==workspace.workspace_id)
291
292
        if self._user:
293
            user = self._session.query(User).get(self._user_id)
294
            # Filter according to user workspaces
295
            workspace_ids = [r.workspace_id for r in user.roles \
296
                             if r.role>=UserRoleInWorkspace.READER]
297
            result = result.filter(ContentRevisionRO.workspace_id.in_(workspace_ids))
298
299
        return result
300
301
    def _revisions_base_query(
302
        self,
303
        workspace: Workspace=None,
304
    ) -> Query:
305
        result = self.__revisions_real_base_query(workspace)
306
307
        if not self._show_deleted:
308
            result = result.filter(ContentRevisionRO.is_deleted==False)
309
310
        if not self._show_archived:
311
            result = result.filter(ContentRevisionRO.is_archived==False)
312
313
        if not self._show_temporary:
314
            result = result.filter(Content.is_temporary==False)
315
316
        return result
317
318
    def _hard_filtered_base_query(
319
        self,
320
        workspace: Workspace=None,
321
    ) -> Query:
322
        """
323
        If set to True, then filterign on is_deleted and is_archived will also
324
        filter parent properties. This is required for search() function which
325
        also search in comments (for example) which may be 'not deleted' while
326
        the associated content is deleted
327
328
        :param hard_filtering:
329
        :return:
330
        """
331
        result = self.__real_base_query(workspace)
332
333
        if not self._show_deleted:
334
            parent = aliased(Content)
335
            result = result.join(parent, Content.parent).\
336
                filter(Content.is_deleted==False).\
337
                filter(parent.is_deleted==False)
338
339
        if not self._show_archived:
340
            parent = aliased(Content)
341
            result = result.join(parent, Content.parent).\
342
                filter(Content.is_archived==False).\
343
                filter(parent.is_archived==False)
344
345
        if not self._show_temporary:
346
            parent = aliased(Content)
347
            result = result.join(parent, Content.parent). \
348
                filter(Content.is_temporary == False). \
349
                filter(parent.is_temporary == False)
350
351
        return result
352
353
    def get_base_query(
354
        self,
355
        workspace: Workspace,
356
    ) -> Query:
357
        return self._base_query(workspace)
358
359
    # TODO - G.M - 2018-07-17 - [Cleanup] Drop this method if unneeded
360
    # def get_child_folders(self, parent: Content=None, workspace: Workspace=None, filter_by_allowed_content_types: list=[], removed_item_ids: list=[], allowed_node_types=None) -> typing.List[Content]:
361
    #     """
362
    #     This method returns child items (folders or items) for left bar treeview.
363
    # 
364
    #     :param parent:
365
    #     :param workspace:
366
    #     :param filter_by_allowed_content_types:
367
    #     :param removed_item_ids:
368
    #     :param allowed_node_types: This parameter allow to hide folders for which the given type of content is not allowed.
369
    #            For example, if you want to move a Page from a folder to another, you should show only folders that accept pages
370
    #     :return:
371
    #     """
372
    #     filter_by_allowed_content_types = filter_by_allowed_content_types or []  # FDV
373
    #     removed_item_ids = removed_item_ids or []  # FDV
374
    # 
375
    #     if not allowed_node_types:
376
    #         allowed_node_types = [CONTENT_TYPES.Folder.slug]
377
    #     elif allowed_node_types==CONTENT_TYPES.Any_SLUG:
378
    #         allowed_node_types = ContentType.all()
379
    # 
380
    #     parent_id = parent.content_id if parent else None
381
    #     folders = self._base_query(workspace).\
382
    #         filter(Content.parent_id==parent_id).\
383
    #         filter(Content.type.in_(allowed_node_types)).\
384
    #         filter(Content.content_id.notin_(removed_item_ids)).\
385
    #         all()
386
    # 
387
    #     if not filter_by_allowed_content_types or \
388
    #                     len(filter_by_allowed_content_types)<=0:
389
    #         # Standard case for the left treeview: we want to show all contents
390
    #         # in the left treeview... so we still filter because for example
391
    #         # comments must not appear in the treeview
392
    #         return [folder for folder in folders \
393
    #                 if folder.type in ContentType.allowed_types_for_folding()]
394
    # 
395
    #     # Now this is a case of Folders only (used for moving content)
396
    #     # When moving a content, you must get only folders that allow to be filled
397
    #     # with the type of content you want to move
398
    #     result = []
399
    #     for folder in folders:
400
    #         for allowed_content_type in filter_by_allowed_content_types:
401
    # 
402
    #             is_folder = folder.type == CONTENT_TYPES.Folder.slug
403
    #             content_type__allowed = folder.properties['allowed_content'][allowed_content_type] == True
404
    # 
405
    #             if is_folder and content_type__allowed:
406
    #                 result.append(folder)
407
    #                 break
408
    # 
409
    #     return result
410
411
    def create(self, content_type_slug: str, workspace: Workspace, parent: Content=None, label: str = '', filename: str = '', do_save=False, is_temporary: bool=False, do_notify=True) -> Content:
412
        # TODO - G.M - 2018-07-16 - raise Exception instead of assert
413
        assert content_type_slug != CONTENT_TYPES.Any_SLUG
414
        assert not (label and filename)
415
416
        if content_type_slug == CONTENT_TYPES.Folder.slug and not label:
417
            label = self.generate_folder_label(workspace, parent)
418
419
        # TODO BS 2018-08-13: Despite that workspace is required, create_comment
420
        # can call here with None. Must update create_comment tu require the
421
        # workspace.
422
        if not workspace:
423
            workspace = parent.workspace
424
425
        content_type = CONTENT_TYPES.get_one_by_slug(content_type_slug)
426
        if parent and parent.properties and 'allowed_content' in parent.properties:
427
            if content_type.slug not in parent.properties['allowed_content'] or not parent.properties['allowed_content'][content_type.slug]:
428
                raise UnallowedSubContent(' SubContent of type {subcontent_type}  not allowed in content {content_id}'.format(  # nopep8
429
                    subcontent_type=content_type.slug,
430
                    content_id=parent.content_id,
431
                ))
432
        if not workspace and parent:
433
            workspace = parent.workspace
434
435
        if workspace:
436
            if content_type.slug not in workspace.get_allowed_content_types():
437
                raise UnallowedSubContent(
438
                    ' SubContent of type {subcontent_type}  not allowed in workspace {content_id}'.format(  # nopep8
439
                        subcontent_type=content_type.slug,
440
                        content_id=workspace.workspace_id,
441
                    )
442
                )
443
444
        content = Content()
445
446
        if filename:
447
            # INFO - G.M - 2018-07-04 - File_name setting automatically
448
            # set label and file_extension
449
            content.file_name = label
450
        elif label:
451
            content.label = label
452
        else:
453
            if content_type_slug == CONTENT_TYPES.Comment.slug:
454
                # INFO - G.M - 2018-07-16 - Default label for comments is
455
                # empty string.
456
                content.label = ''
457
            else:
458
                raise EmptyLabelNotAllowed('Content of this type should have a valid label')  # nopep8
459
460
        content.owner = self._user
461
        content.parent = parent
462
463
        content.workspace = workspace
464
        content.type = content_type.slug
465
        content.is_temporary = is_temporary
466
        content.revision_type = ActionDescription.CREATION
467
468
        if content.type in (
469
                CONTENT_TYPES.Page.slug,
470
                CONTENT_TYPES.Thread.slug,
471
        ):
472
            content.file_extension = '.html'
473
474
        if do_save:
475
            self._session.add(content)
476
            self.save(content, ActionDescription.CREATION, do_notify=do_notify)
477
        return content
478
479
    def create_comment(self, workspace: Workspace=None, parent: Content=None, content:str ='', do_save=False) -> Content:
480
        # TODO: check parent allowed_type and workspace allowed_ type
481
        assert parent and parent.type != CONTENT_TYPES.Folder.slug
482
        if not content:
483
            raise EmptyCommentContentNotAllowed()
484
485
        item = self.create(
486
            content_type_slug=CONTENT_TYPES.Comment.slug,
487
            workspace=workspace,
488
            parent=parent,
489
            do_notify=False,
490
            do_save=False,
491
            label='',
492
        )
493
        item.description = content
494
        item.revision_type = ActionDescription.COMMENT
495
496
        if do_save:
497
            self.save(item, ActionDescription.COMMENT)
498
        return item
499
500
    def get_one_from_revision(self, content_id: int, content_type: str, workspace: Workspace=None, revision_id=None) -> Content:
501
        """
502
        This method is a hack to convert a node revision item into a node
503
        :param content_id:
504
        :param content_type:
505
        :param workspace:
506
        :param revision_id:
507
        :return:
508
        """
509
510
        content = self.get_one(content_id, content_type, workspace)
511
        revision = self._session.query(ContentRevisionRO).filter(ContentRevisionRO.revision_id==revision_id).one()
512
513
        if revision.content_id==content.content_id:
514
            content.revision_to_serialize = revision.revision_id
515
        else:
516
            raise ValueError('Revision not found for given content')
517
518
        return content
519
520
    def get_one(self, content_id: int, content_type: str, workspace: Workspace=None, parent: Content=None) -> Content:
521
522
        if not content_id:
523
            return None
524
525
        base_request = self._base_query(workspace).filter(Content.content_id==content_id)
526
527
        if content_type!=CONTENT_TYPES.Any_SLUG:
528
            base_request = base_request.filter(Content.type==content_type)
529
530
        if parent:
531
            base_request = base_request.filter(Content.parent_id==parent.content_id)  # nopep8
532
533
        try:
534
            content = base_request.one()
535
        except NoResultFound as exc:
536
            # TODO - G.M - 2018-07-16 - Add better support for all different
537
            # error case who can happened here
538
            # like content doesn't exist, wrong parent, wrong content_type, wrong workspace,
539
            # wrong access to this workspace, wrong base filter according
540
            # to content_status.
541
            raise ContentNotFound('Content "{}" not found in database'.format(content_id)) from exc  # nopep8
542
        return content
543
544
    def get_one_revision(self, revision_id: int = None, content: Content= None) -> ContentRevisionRO:  # nopep8
545
        """
546
        This method allow us to get directly any revision with its id
547
        :param revision_id: The content's revision's id that we want to return
548
        :param content: The content related to the revision, if None do not
549
        check if revision is related to this content.
550
        :return: An item Content linked with the correct revision
551
        """
552
        assert revision_id is not None# DYN_REMOVE
553
554
        revision = self._session.query(ContentRevisionRO).filter(ContentRevisionRO.revision_id == revision_id).one()
555
        if content and revision.content_id != content.content_id:
556
            raise RevisionDoesNotMatchThisContent(
557
                'revision {revision_id} is not a revision of content {content_id}'.format(  # nopep8
558
                    revision_id=revision.revision_id,
559
                    content_id=content.content_id,
560
                    )
561
            )
562
        return revision
563
564
    # INFO - A.P - 2017-07-03 - python file object getter
565
    # in case of we cook a version of preview manager that allows a pythonic
566
    # access to files
567
    # def get_one_revision_file(self, revision_id: int = None):
568
    #     """
569
    #     This function allows us to directly get a Python file object from its
570
    #     revision identifier.
571
    #     :param revision_id: The revision id of the file we want to return
572
    #     :return: The corresponding Python file object
573
    #     """
574
    #     revision = self.get_one_revision(revision_id)
575
    #     return DepotManager.get().get(revision.depot_file)
576
577
    def get_one_revision_filepath(self, revision_id: int = None) -> str:
578
        """
579
        This method allows us to directly get a file path from its revision
580
        identifier.
581
        :param revision_id: The revision id of the filepath we want to return
582
        :return: The corresponding filepath
583
        """
584
        revision = self.get_one_revision(revision_id)
585
        depot = DepotManager.get()
586
        depot_stored_file = depot.get(revision.depot_file)  # type: StoredFile
587
        depot_file_path = depot_stored_file._file_path  # type: str
588
        return depot_file_path
589
590
    def get_one_by_label_and_parent(
591
            self,
592
            content_label: str,
593
            content_parent: Content=None,
594
    ) -> Content:
595
        """
596
        This method let us request the database to obtain a Content with its name and parent
597
        :param content_label: Either the content's label or the content's filename if the label is None
598
        :param content_parent: The parent's content
599
        :param workspace: The workspace's content
600
        :return The corresponding Content
601
        """
602
        workspace = content_parent.workspace if content_parent else None
603
        query = self._base_query(workspace)
604
        parent_id = content_parent.content_id if content_parent else None
605
        query = query.filter(Content.parent_id == parent_id)
606
607
        file_name, file_extension = os.path.splitext(content_label)
608
609
        return query.filter(
610
            or_(
611
                and_(
612
                    Content.type == CONTENT_TYPES.File.slug,
613
                    Content.label == file_name,
614
                    Content.file_extension == file_extension,
615
                ),
616
                and_(
617
                    Content.type == CONTENT_TYPES.Thread.slug,
618
                    Content.label == file_name,
619
                ),
620
                and_(
621
                    Content.type == CONTENT_TYPES.Page.slug,
622
                    Content.label == file_name,
623
                ),
624
                and_(
625
                    Content.type == CONTENT_TYPES.Folder.slug,
626
                    Content.label == content_label,
627
                ),
628
            )
629
        ).one()
630
631
    def get_one_by_label_and_parent_labels(
632
            self,
633
            content_label: str,
634
            workspace: Workspace,
635
            content_parent_labels: [str]=None,
636
    ):
637
        """
638
        Return content with it's label, workspace and parents labels (optional)
639
        :param content_label: label of content (label or file_name)
640
        :param workspace: workspace containing all of this
641
        :param content_parent_labels: Ordered list of labels representing path
642
            of folder (without workspace label).
643
        E.g.: ['foo', 'bar'] for complete path /Workspace1/foo/bar folder
644
        :return: Found Content
645
        """
646
        query = self._base_query(workspace)
647
        parent_folder = None
648
649
        # Grab content parent folder if parent path given
650
        if content_parent_labels:
651
            parent_folder = self.get_folder_with_workspace_path_labels(
652
                content_parent_labels,
653
                workspace,
654
            )
655
656
        # Build query for found content by label
657
        content_query = self.filter_query_for_content_label_as_path(
658
            query=query,
659
            content_label_as_file=content_label,
660
        )
661
662
        # Modify query to apply parent folder filter if any
663
        if parent_folder:
664
            content_query = content_query.filter(
665
                Content.parent_id == parent_folder.content_id,
666
            )
667
        else:
668
            content_query = content_query.filter(
669
                Content.parent_id == None,
670
            )
671
672
        # Filter with workspace
673
        content_query = content_query.filter(
674
            Content.workspace_id == workspace.workspace_id,
675
        )
676
677
        # Return the content
678
        return content_query\
679
            .order_by(
680
                Content.revision_id.desc(),
681
            )\
682
            .one()
683
684
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
685
    def get_folder_with_workspace_path_labels(
686
            self,
687
            path_labels: [str],
688
            workspace: Workspace,
689
    ) -> Content:
690
        """
691
        Return a Content folder for given relative path.
692
        TODO BS 20161124: Not safe if web interface allow folder duplicate names
693
        :param path_labels: List of labels representing path of folder
694
        (without workspace label).
695
        E.g.: ['foo', 'bar'] for complete path /Workspace1/foo/bar folder
696
        :param workspace: workspace of folders
697
        :return: Content folder
698
        """
699
        query = self._base_query(workspace)
700
        folder = None
701
702
        for label in path_labels:
703
            # Filter query on label
704
            folder_query = query \
705
                .filter(
706
                    Content.type == CONTENT_TYPES.Folder.slug,
707
                    Content.label == label,
708
                    Content.workspace_id == workspace.workspace_id,
709
                )
710
711
            # Search into parent folder (if already deep)
712
            if folder:
713
                folder_query = folder_query\
714
                    .filter(
715
                        Content.parent_id == folder.content_id,
716
                    )
717
            else:
718
                folder_query = folder_query \
719
                    .filter(Content.parent_id == None)
720
721
            # Get thirst corresponding folder
722
            folder = folder_query \
723
                .order_by(Content.revision_id.desc()) \
724
                .one()
725
726
        return folder
727
728
    def filter_query_for_content_label_as_path(
729
            self,
730
            query: Query,
731
            content_label_as_file: str,
732
            is_case_sensitive: bool = False,
733
    ) -> Query:
734
        """
735
        Apply normalised filters to found Content corresponding as given label.
736
        :param query: query to modify
737
        :param content_label_as_file: label in this
738
        FILE version, use Content.get_label_as_file().
739
        :param is_case_sensitive: Take care about case or not
740
        :return: modified query
741
        """
742
        file_name, file_extension = os.path.splitext(content_label_as_file)
743
744
        label_filter = Content.label == content_label_as_file
745
        file_name_filter = Content.label == file_name
746
        file_extension_filter = Content.file_extension == file_extension
747
748
        if not is_case_sensitive:
749
            label_filter = func.lower(Content.label) == \
750
                           func.lower(content_label_as_file)
751
            file_name_filter = func.lower(Content.label) == \
752
                               func.lower(file_name)
753
            file_extension_filter = func.lower(Content.file_extension) == \
754
                                    func.lower(file_extension)
755
756
        return query.filter(or_(
757
            and_(
758
                Content.type == CONTENT_TYPES.File.slug,
759
                file_name_filter,
760
                file_extension_filter,
761
            ),
762
            and_(
763
                Content.type == CONTENT_TYPES.Thread.slug,
764
                file_name_filter,
765
                file_extension_filter,
766
            ),
767
            and_(
768
                Content.type == CONTENT_TYPES.Page.slug,
769
                file_name_filter,
770
                file_extension_filter,
771
            ),
772
            and_(
773
                Content.type == CONTENT_TYPES.Folder.slug,
774
                label_filter,
775
            ),
776
        ))
777
778
779
    def get_pdf_preview_path(
780
            self,
781
            content_id: int,
782
            revision_id: int,
783
            page: int
784
    ) -> str:
785
        """
786
        Get pdf preview of revision of content
787
        :param content_id: id of content
788
        :param revision_id: id of content revision
789
        :param page: page number of the preview, useful for multipage content
790
        :return: preview_path as string
791
        """
792
        file_path = self.get_one_revision_filepath(revision_id)
793
        if page >= self.preview_manager.get_page_nb(file_path):
794
            raise PageOfPreviewNotFound(
795
                'page {page} of content {content_id} does not exist'.format(
796
                    page=page,
797
                    content_id=content_id
798
                ),
799
            )
800
        jpg_preview_path = self.preview_manager.get_pdf_preview(
801
            file_path,
802
            page=page
803
        )
804
        return jpg_preview_path
805
806
    def get_full_pdf_preview_path(self, revision_id: int) -> str:
807
        """
808
        Get full(multiple page) pdf preview of revision of content
809
        :param revision_id: id of revision
810
        :return: path of the full pdf preview of this revision
811
        """
812
        file_path = self.get_one_revision_filepath(revision_id)
813
        pdf_preview_path = self.preview_manager.get_pdf_preview(file_path)
814
        return pdf_preview_path
815
816
    def get_jpg_preview_allowed_dim(self) -> PreviewAllowedDim:
817
        """
818
        Get jpg preview allowed dimensions and strict bool param.
819
        """
820
        return PreviewAllowedDim(
821
            self._config.PREVIEW_JPG_RESTRICTED_DIMS,
822
            self._config.PREVIEW_JPG_ALLOWED_DIMS,
823
        )
824
825
    def get_jpg_preview_path(
826
        self,
827
        content_id: int,
828
        revision_id: int,
829
        page: int,
830
        width: int = None,
831
        height: int = None,
832
    ) -> str:
833
        """
834
        Get jpg preview of revision of content
835
        :param content_id: id of content
836
        :param revision_id: id of content revision
837
        :param page: page number of the preview, useful for multipage content
838
        :param width: width in pixel
839
        :param height: height in pixel
840
        :return: preview_path as string
841
        """
842
        file_path = self.get_one_revision_filepath(revision_id)
843
        if page >= self.preview_manager.get_page_nb(file_path):
844
            raise Exception(
845
                'page {page} of revision {revision_id} of content {content_id} does not exist'.format(  # nopep8
846
                    page=page,
847
                    revision_id=revision_id,
848
                    content_id=content_id,
849
                ),
850
            )
851
        if not width and not height:
852
            width = self._config.PREVIEW_JPG_ALLOWED_DIMS[0].width
853
            height = self._config.PREVIEW_JPG_ALLOWED_DIMS[0].height
854
855
        allowed_dim = False
856
        for preview_dim in self._config.PREVIEW_JPG_ALLOWED_DIMS:
857
            if width == preview_dim.width and height == preview_dim.height:
858
                allowed_dim = True
859
                break
860
861
        if not allowed_dim and self._config.PREVIEW_JPG_RESTRICTED_DIMS:
862
            raise PreviewDimNotAllowed(
863
                'Size {width}x{height} is not allowed for jpeg preview'.format(
864
                    width=width,
865
                    height=height,
866
                )
867
            )
868
        jpg_preview_path = self.preview_manager.get_jpeg_preview(
869
            file_path,
870
            page=page,
871
            width=width,
872
            height=height,
873
        )
874
        return jpg_preview_path
875
876
    def _get_all_query(
877
        self,
878
        parent_id: int = None,
879
        content_type_slug: str = CONTENT_TYPES.Any_SLUG,
880
        workspace: Workspace = None
881
    ) -> Query:
882
        """
883
        Extended filter for better "get all data" query
884
        :param parent_id: filter by parent_id
885
        :param content_type_slug: filter by content_type slug
886
        :param workspace: filter by workspace
887
        :return:
888
        """
889
        assert parent_id is None or isinstance(parent_id, int)
890
        assert content_type_slug is not None
891
        resultset = self._base_query(workspace)
892
893
        if content_type_slug != CONTENT_TYPES.Any_SLUG:
894
            # INFO - G.M - 2018-07-05 - convert with
895
            #  content type object to support legacy slug
896
            content_type_object = CONTENT_TYPES.get_one_by_slug(content_type_slug)
897
            all_slug_alias = [content_type_object.slug]
898
            if content_type_object.slug_alias:
899
                all_slug_alias.extend(content_type_object.slug_alias)
900
            resultset = resultset.filter(Content.type.in_(all_slug_alias))
901
902
        if parent_id:
903
            resultset = resultset.filter(Content.parent_id==parent_id)
904
        if parent_id == 0 or parent_id is False:
905
            resultset = resultset.filter(Content.parent_id == None)
906
907
        return resultset
908
909
    def get_all(self, parent_id: int=None, content_type: str=CONTENT_TYPES.Any_SLUG, workspace: Workspace=None) -> typing.List[Content]:
910
        return self._get_all_query(parent_id, content_type, workspace).all()
911
912
    # TODO - G.M - 2018-07-17 - [Cleanup] Drop this method if unneeded
913
    # def get_children(self, parent_id: int, content_types: list, workspace: Workspace=None) -> typing.List[Content]:
914
    #     """
915
    #     Return parent_id childs of given content_types
916
    #     :param parent_id: parent id
917
    #     :param content_types: list of types
918
    #     :param workspace: workspace filter
919
    #     :return: list of content
920
    #     """
921
    #     resultset = self._base_query(workspace)
922
    #     resultset = resultset.filter(Content.type.in_(content_types))
923
    #
924
    #     if parent_id:
925
    #         resultset = resultset.filter(Content.parent_id==parent_id)
926
    #     if parent_id is False:
927
    #         resultset = resultset.filter(Content.parent_id == None)
928
    #
929
    #     return resultset.all()
930
931
    # TODO - G.M - 2018-07-17 - [Cleanup] Drop this method if unneeded
932
    # TODO find an other name to filter on is_deleted / is_archived
933
    def get_all_with_filter(self, parent_id: int=None, content_type: str=CONTENT_TYPES.Any_SLUG, workspace: Workspace=None) -> typing.List[Content]:
934
        assert parent_id is None or isinstance(parent_id, int) # DYN_REMOVE
935
        assert content_type is not None# DYN_REMOVE
936
        assert isinstance(content_type, str) # DYN_REMOVE
937
938
        resultset = self._base_query(workspace)
939
940
        if content_type != CONTENT_TYPES.Any_SLUG:
941
            resultset = resultset.filter(Content.type==content_type)
942
943
        resultset = resultset.filter(Content.is_deleted == self._show_deleted)
944
        resultset = resultset.filter(Content.is_archived == self._show_archived)
945
        resultset = resultset.filter(Content.is_temporary == self._show_temporary)
946
947
        resultset = resultset.filter(Content.parent_id==parent_id)
948
949
        return resultset.all()
950
951
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
952
    def get_all_without_exception(self, content_type: str, workspace: Workspace=None) -> typing.List[Content]:
953
        assert content_type is not None# DYN_REMOVE
954
955
        resultset = self._base_query(workspace)
956
957
        if content_type != CONTENT_TYPES.Any_SLUG:
958
            resultset = resultset.filter(Content.type==content_type)
959
960
        return resultset.all()
961
962
    def get_last_unread(self, parent_id: int, content_type: str,
963
                        workspace: Workspace=None, limit=10) -> typing.List[Content]:
964
        assert parent_id is None or isinstance(parent_id, int) # DYN_REMOVE
965
        assert content_type is not None# DYN_REMOVE
966
        assert isinstance(content_type, str) # DYN_REMOVE
967
968
        read_revision_ids = self._session.query(RevisionReadStatus.revision_id) \
969
            .filter(RevisionReadStatus.user_id==self._user_id)
970
971
        not_read_revisions = self._revisions_base_query(workspace) \
972
            .filter(~ContentRevisionRO.revision_id.in_(read_revision_ids)) \
973
            .filter(ContentRevisionRO.workspace_id == Workspace.workspace_id) \
974
            .filter(Workspace.is_deleted.is_(False)) \
975
            .subquery()
976
977
    # TODO - G.M - 2018-07-17 - [Cleanup] Drop this method if unneeded
978
    # def get_all_without_exception(self, content_type: str, workspace: Workspace=None) -> typing.List[Content]:
979
    #     assert content_type is not None# DYN_REMOVE
980
    #
981
    #     resultset = self._base_query(workspace)
982
    #
983
    #     if content_type != CONTENT_TYPES.Any_SLUG:
984
    #         resultset = resultset.filter(Content.type==content_type)
985
    #
986
    #     return resultset.all()
987
988
    def get_last_active(
989
            self,
990
            workspace: Workspace=None,
991
            limit: typing.Optional[int]=None,
992
            before_content: typing.Optional[Content]= None,
993
            content_ids: typing.Optional[typing.List[int]] = None,
994
    ) -> typing.List[Content]:
995
        """
996
        get contents list sorted by last update
997
        (last modification of content itself or one of this comment)
998
        :param workspace: Workspace to check
999
        :param limit: maximum number of elements to return
1000
        :param before_content: last_active content are only those updated
1001
         before this content given.
1002
        :param content_ids: restrict selection to some content ids and
1003
        related Comments
1004
        :return: list of content
1005
        """
1006
1007
        resultset = self._get_all_query(
1008
            workspace=workspace,
1009
        )
1010
        if content_ids:
1011
            resultset = resultset.filter(
1012
                or_(
1013
                    Content.content_id.in_(content_ids),
1014
                    and_(
1015
                        Content.parent_id.in_(content_ids),
1016
                        Content.type == CONTENT_TYPES.Comment.slug
1017
                    )
1018
                )
1019
            )
1020
1021
        resultset = resultset.order_by(desc(Content.updated))
1022
1023
        active_contents = []
1024
        too_recent_content = []
1025
        before_content_find = False
1026
        for content in resultset:
1027
            related_active_content = None
1028
            if CONTENT_TYPES.Comment.slug == content.type:
1029
                related_active_content = content.parent
1030
            else:
1031
                related_active_content = content
1032
1033
            # INFO - G.M - 2018-08-10 - re-apply general filters here to avoid
1034
            # issue with comments
1035
            if not self._show_deleted and related_active_content.is_deleted:
1036
                continue
1037
            if not self._show_archived and related_active_content.is_archived:
1038
                continue
1039
1040
            if related_active_content not in active_contents and related_active_content not in too_recent_content:  # nopep8
1041
1042
                if not before_content or before_content_find:
1043
                    active_contents.append(related_active_content)
1044
                else:
1045
                    too_recent_content.append(related_active_content)
1046
1047
                if before_content and related_active_content == before_content:
1048
                    before_content_find = True
1049
1050
            if limit and len(active_contents) >= limit:
1051
                break
1052
1053
        return active_contents
1054
1055
    # TODO - G.M - 2018-07-19 - Find a way to update this method to something
1056
    # usable and efficient for tracim v2 to get content with read/unread status
1057
    # instead of relying on has_new_information_for()
1058
    # def get_last_unread(self, parent_id: typing.Optional[int], content_type: str,
1059
    #                     workspace: Workspace=None, limit=10) -> typing.List[Content]:
1060
    #     assert parent_id is None or isinstance(parent_id, int) # DYN_REMOVE
1061
    #     assert content_type is not None# DYN_REMOVE
1062
    #     assert isinstance(content_type, str) # DYN_REMOVE
1063
    #
1064
    #     read_revision_ids = self._session.query(RevisionReadStatus.revision_id) \
1065
    #         .filter(RevisionReadStatus.user_id==self._user_id)
1066
    #
1067
    #     not_read_revisions = self._revisions_base_query(workspace) \
1068
    #         .filter(~ContentRevisionRO.revision_id.in_(read_revision_ids)) \
1069
    #         .filter(ContentRevisionRO.workspace_id == Workspace.workspace_id) \
1070
    #         .filter(Workspace.is_deleted.is_(False)) \
1071
    #         .subquery()
1072
    #
1073
    #     not_read_content_ids_query = self._session.query(
1074
    #         distinct(not_read_revisions.c.content_id)
1075
    #     )
1076
    #     not_read_content_ids = list(map(
1077
    #         itemgetter(0),
1078
    #         not_read_content_ids_query,
1079
    #     ))
1080
    #
1081
    #     not_read_contents = self._base_query(workspace) \
1082
    #         .filter(Content.content_id.in_(not_read_content_ids)) \
1083
    #         .order_by(desc(Content.updated))
1084
    #
1085
    #     if content_type != CONTENT_TYPES.Any_SLUG:
1086
    #         not_read_contents = not_read_contents.filter(
1087
    #             Content.type==content_type)
1088
    #     else:
1089
    #         not_read_contents = not_read_contents.filter(
1090
    #             Content.type!=CONTENT_TYPES.Folder.slug)
1091
    #
1092
    #     if parent_id:
1093
    #         not_read_contents = not_read_contents.filter(
1094
    #             Content.parent_id==parent_id)
1095
    #
1096
    #     result = []
1097
    #     for item in not_read_contents:
1098
    #         new_item = None
1099
    #         if CONTENT_TYPES.Comment.slug == item.type:
1100
    #             new_item = item.parent
1101
    #         else:
1102
    #             new_item = item
1103
    #
1104
    #         # INFO - D.A. - 2015-05-20
1105
    #         # We do not want to show only one item if the last 10 items are
1106
    #         # comments about one thread for example
1107
    #         if new_item not in result:
1108
    #             result.append(new_item)
1109
    #
1110
    #         if len(result) >= limit:
1111
    #             break
1112
    #
1113
    #     return result
1114
1115
    def _set_allowed_content(self, content: Content, allowed_content_dict: dict) -> None:  # nopep8
1116
        """
1117
        :param content: the given content instance
1118
        :param allowed_content_dict: must be something like this:
1119
            dict(
1120
                folder = True
1121
                thread = True,
1122
                file = False,
1123
                page = True
1124
            )
1125
        :return: nothing
1126
        """
1127
        properties = content.properties.copy()
1128
        properties['allowed_content'] = allowed_content_dict
1129
        content.properties = properties
1130
1131
    def set_allowed_content(self, content: Content, allowed_content_type_slug_list: typing.List[str]) -> None:  # nopep8
1132
        """
1133
        :param content: the given content instance
1134
        :param allowed_content_type_slug_list: list of content_type_slug to
1135
        accept as subcontent.
1136
        :return: nothing
1137
        """
1138
        allowed_content_dict = {}
1139
        for allowed_content_type_slug in allowed_content_type_slug_list:
1140
            if allowed_content_type_slug not in CONTENT_TYPES.endpoint_allowed_types_slug():
1141
                raise ContentTypeNotExist('Content_type {} does not exist'.format(allowed_content_type_slug))  # nopep8
1142
            allowed_content_dict[allowed_content_type_slug] = True
1143
1144
        self._set_allowed_content(content, allowed_content_dict)
1145
1146
    def restore_content_default_allowed_content(self, content: Content) -> None:
1147
        """
1148
        Return to default allowed_content_types
1149
        :param content: the given content instance
1150
        :return: nothing
1151
        """
1152
        if content._properties and 'allowed_content' in content._properties:
1153
            properties = content.properties.copy()
1154
            del properties['allowed_content']
1155
            content.properties = properties
1156
1157
    def set_status(self, content: Content, new_status: str):
1158
        if new_status in CONTENT_STATUS.get_all_slugs_values():
1159
            content.status = new_status
1160
            content.revision_type = ActionDescription.STATUS_UPDATE
1161
        else:
1162
            raise ValueError('The given value {} is not allowed'.format(new_status))
1163
1164
    def move(self,
1165
             item: Content,
1166
             new_parent: Content,
1167
             must_stay_in_same_workspace: bool=True,
1168
             new_workspace: Workspace=None,
1169
    ):
1170
        if must_stay_in_same_workspace:
1171
            if new_parent and new_parent.workspace_id != item.workspace_id:
1172
                raise ValueError('the item should stay in the same workspace')
1173
1174
        item.parent = new_parent
1175
        if new_workspace:
1176
            item.workspace = new_workspace
1177
            if new_parent and \
1178
                    new_parent.workspace_id != new_workspace.workspace_id:
1179
                raise WorkspacesDoNotMatch(
1180
                    'new parent workspace and new workspace should be the same.'
1181
                )
1182
        else:
1183
            if new_parent:
1184
                item.workspace = new_parent.workspace
1185
1186
        item.revision_type = ActionDescription.MOVE
1187
1188
    def copy(
1189
        self,
1190
        item: Content,
1191
        new_parent: Content=None,
1192
        new_label: str=None,
1193
        do_save: bool=True,
1194
        do_notify: bool=True,
1195
    ) -> Content:
1196
        """
1197
        Copy nearly all content, revision included. Children not included, see
1198
        "copy_children" for this.
1199
        :param item: Item to copy
1200
        :param new_parent: new parent of the new copied item
1201
        :param new_label: new label of the new copied item
1202
        :param do_notify: notify copy or not
1203
        :return: Newly copied item
1204
        """
1205
        if (not new_parent and not new_label) or (new_parent == item.parent and new_label == item.label):  # nopep8
1206
            # TODO - G.M - 08-03-2018 - Use something else than value error
1207
            raise ValueError("You can't copy file into itself")
1208
        if new_parent:
1209
            workspace = new_parent.workspace
1210
            parent = new_parent
1211
        else:
1212
            workspace = item.workspace
1213
            parent = item.parent
1214
        label = new_label or item.label
1215
1216
        content = item.copy(parent)
1217
        # INFO - GM - 15-03-2018 - add "copy" revision
1218
        with new_revision(
1219
            session=self._session,
1220
            tm=transaction.manager,
1221
            content=content,
1222
            force_create_new_revision=True
1223
        ) as rev:
1224
            rev.parent = parent
1225
            rev.workspace = workspace
1226
            rev.label = label
1227
            rev.revision_type = ActionDescription.COPY
1228
            rev.properties['origin'] = {
1229
                'content': item.id,
1230
                'revision': item.last_revision.revision_id,
1231
            }
1232
        if do_save:
1233
            self.save(content, ActionDescription.COPY, do_notify=do_notify)
1234
        return content
1235
1236
    def copy_children(self, origin_content: Content, new_content: Content):
1237
        for child in origin_content.children:
1238
            self.copy(child, new_content)
1239
1240
    def move_recursively(self, item: Content,
1241
                         new_parent: Content, new_workspace: Workspace):
1242
        self.move(item, new_parent, False, new_workspace)
1243
        self.save(item, do_notify=False)
1244
1245
        for child in item.children:
1246
            with new_revision(
1247
                session=self._session,
1248
                tm=transaction.manager,
1249
                content=child
1250
            ):
1251
                self.move_recursively(child, item, new_workspace)
1252
        return
1253
1254
    def update_content(self, item: Content, new_label: str, new_content: str=None) -> Content:
1255
        if item.label==new_label and item.description==new_content:
1256
            # TODO - G.M - 20-03-2018 - Fix internatization for webdav access.
1257
            # Internatization disabled in libcontent for now.
1258
            raise SameValueError('The content did not changed')
1259
        if not new_label:
1260
            raise EmptyLabelNotAllowed()
1261
        item.owner = self._user
1262
        item.label = new_label
1263
        item.description = new_content if new_content else item.description # TODO: convert urls into links
1264
        item.revision_type = ActionDescription.EDITION
1265
        return item
1266
1267
    def update_file_data(self, item: Content, new_filename: str, new_mimetype: str, new_content: bytes) -> Content:
1268
        if new_mimetype == item.file_mimetype and \
1269
                new_content == item.depot_file.file.read():
1270
            raise SameValueError('The content did not changed')
1271
        item.owner = self._user
1272
        item.file_name = new_filename
1273
        item.file_mimetype = new_mimetype
1274
        item.depot_file = FileIntent(
1275
            new_content,
1276
            new_filename,
1277
            new_mimetype,
1278
        )
1279
        item.revision_type = ActionDescription.REVISION
1280
        return item
1281
1282
    def archive(self, content: Content):
1283
        content.owner = self._user
1284
        content.is_archived = True
1285
        # TODO - G.M - 12-03-2018 - Inspect possible label conflict problem
1286
        # INFO - G.M - 12-03-2018 - Set label name to avoid trouble when
1287
        # un-archiving file.
1288
        content.label = '{label}-{action}-{date}'.format(
1289
            label=content.label,
1290
            action='archived',
1291
            date=current_date_for_filename()
1292
        )
1293
        content.revision_type = ActionDescription.ARCHIVING
1294
1295
    def unarchive(self, content: Content):
1296
        content.owner = self._user
1297
        content.is_archived = False
1298
        content.revision_type = ActionDescription.UNARCHIVING
1299
1300
    def delete(self, content: Content):
1301
        content.owner = self._user
1302
        content.is_deleted = True
1303
        # TODO - G.M - 12-03-2018 - Inspect possible label conflict problem
1304
        # INFO - G.M - 12-03-2018 - Set label name to avoid trouble when
1305
        # un-deleting file.
1306
        content.label = '{label}-{action}-{date}'.format(
1307
            label=content.label,
1308
            action='deleted',
1309
            date=current_date_for_filename()
1310
        )
1311
        content.revision_type = ActionDescription.DELETION
1312
1313
    def undelete(self, content: Content):
1314
        content.owner = self._user
1315
        content.is_deleted = False
1316
        content.revision_type = ActionDescription.UNDELETION
1317
1318
    def mark_read__all(self,
1319
                       read_datetime: datetime=None,
1320
                       do_flush: bool=True,
1321
                       recursive: bool=True
1322
                       ):
1323
        return self.mark_read__workspace(None, read_datetime, do_flush, recursive) # nopep8
1324
1325
    def mark_read__workspace(self,
1326
                       workspace : Workspace,
1327
                       read_datetime: datetime=None,
1328
                       do_flush: bool=True,
1329
                       recursive: bool=True
1330
                       ):
1331
        itemset = self.get_last_active(workspace)
1332
        for item in itemset:
1333
            if item.has_new_information_for(self._user):
1334
                self.mark_read(item, read_datetime, do_flush, recursive)
1335
1336
    def mark_read(self, content: Content,
1337
                  read_datetime: datetime=None,
1338
                  do_flush: bool=True, recursive: bool=True) -> Content:
1339
1340
        assert self._user
1341
        assert content
1342
1343
        # The algorithm is:
1344
        # 1. define the read datetime
1345
        # 2. update all revisions related to current Content
1346
        # 3. do the same for all child revisions
1347
        #    (ie parent_id is content_id of current content)
1348
1349
        if not read_datetime:
1350
            read_datetime = datetime.datetime.now()
1351
1352
        viewed_revisions = self._session.query(ContentRevisionRO) \
1353
            .filter(ContentRevisionRO.content_id==content.content_id).all()
1354
1355
        for revision in viewed_revisions:
1356
            revision.read_by[self._user] = read_datetime
1357
1358
        if recursive:
1359
            # mark read :
1360
            # - all children
1361
            # - parent stuff (if you mark a comment as read,
1362
            #                 then you have seen the parent)
1363
            # - parent comments
1364
            for child in content.get_valid_children():
1365
                self.mark_read(child, read_datetime=read_datetime,
1366
                               do_flush=False)
1367
1368
            if CONTENT_TYPES.Comment.slug == content.type:
1369
                self.mark_read(content.parent, read_datetime=read_datetime,
1370
                               do_flush=False, recursive=False)
1371
                for comment in content.parent.get_comments():
1372
                    if comment != content:
1373
                        self.mark_read(comment, read_datetime=read_datetime,
1374
                                       do_flush=False, recursive=False)
1375
1376
        if do_flush:
1377
            self.flush()
1378
1379
        return content
1380
1381
    def mark_unread(self, content: Content, do_flush=True) -> Content:
1382
        assert self._user
1383
        assert content
1384
1385
        revisions = self._session.query(ContentRevisionRO) \
1386
            .filter(ContentRevisionRO.content_id==content.content_id).all()
1387
1388
        for revision in revisions:
1389
            try:
1390
                del revision.read_by[self._user]
1391
            except KeyError:
1392
                pass
1393
1394
        for child in content.get_valid_children():
1395
            self.mark_unread(child, do_flush=False)
1396
1397
        if do_flush:
1398
            self.flush()
1399
1400
        return content
1401
1402
    def flush(self):
1403
        self._session.flush()
1404
1405
    def save(self, content: Content, action_description: str=None, do_flush=True, do_notify=True):
1406
        """
1407
        Save an object, flush the session and set the revision_type property
1408
        :param content:
1409
        :param action_description:
1410
        :return:
1411
        """
1412
        assert action_description is None or action_description in ActionDescription.allowed_values()
1413
1414
        if not action_description:
1415
            # See if the last action has been modified
1416
            if content.revision_type==None or len(get_history(content.revision, 'revision_type'))<=0:
1417
                # The action has not been modified, so we set it to default edition
1418
                action_description = ActionDescription.EDITION
1419
1420
        if action_description:
1421
            content.revision_type = action_description
1422
1423
        if do_flush:
1424
            # INFO - 2015-09-03 - D.A.
1425
            # There are 2 flush because of the use
1426
            # of triggers for content creation
1427
            #
1428
            # (when creating a content, actually this is an insert of a new
1429
            # revision in content_revisions ; so the mark_read operation need
1430
            # to get full real data from database before to be prepared.
1431
1432
            self._session.add(content)
1433
            self._session.flush()
1434
1435
            # TODO - 2015-09-03 - D.A. - Do not use triggers
1436
            # We should create a new ContentRevisionRO object instead of Content
1437
            # This would help managing view/not viewed status
1438
            self.mark_read(content, do_flush=True)
1439
1440
        if do_notify:
1441
            self.do_notify(content)
1442
1443
    def do_notify(self, content: Content):
1444
        """
1445
        Allow to force notification for a given content. By default, it is
1446
        called during the .save() operation
1447
        :param content:
1448
        :return:
1449
        """
1450
        NotifierFactory.create(
1451
            config=self._config,
1452
            current_user=self._user,
1453
            session=self._session,
1454
        ).notify_content_update(content)
1455
1456
    def get_keywords(self, search_string, search_string_separators=None) -> [str]:
1457
        """
1458
        :param search_string: a list of coma-separated keywords
1459
        :return: a list of str (each keyword = 1 entry
1460
        """
1461
1462
        search_string_separators = search_string_separators or ContentApi.SEARCH_SEPARATORS
1463
1464
        keywords = []
1465
        if search_string:
1466
            keywords = [keyword.strip() for keyword in re.split(search_string_separators, search_string)]
1467
1468
        return keywords
1469
1470
    def search(self, keywords: [str]) -> Query:
1471
        """
1472
        :return: a sorted list of Content items
1473
        """
1474
1475
        if len(keywords)<=0:
1476
            return None
1477
1478
        filter_group_label = list(Content.label.ilike('%{}%'.format(keyword)) for keyword in keywords)
1479
        filter_group_desc = list(Content.description.ilike('%{}%'.format(keyword)) for keyword in keywords)
1480
        title_keyworded_items = self._hard_filtered_base_query().\
1481
            filter(or_(*(filter_group_label+filter_group_desc))).\
1482
            options(joinedload('children_revisions')).\
1483
            options(joinedload('parent'))
1484
1485
        return title_keyworded_items
1486
1487
    def get_all_types(self) -> typing.List[ContentType]:
1488
        labels = CONTENT_TYPES.endpoint_allowed_types_slug()
1489
        content_types = []
1490
        for label in labels:
1491
            content_types.append(CONTENT_TYPES.get_one_by_slug(label))
1492
1493
        return content_types
1494
1495
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1496
    def exclude_unavailable(
1497
        self,
1498
        contents: typing.List[Content],
1499
    ) -> typing.List[Content]:
1500
        """
1501
        Update and return list with content under archived/deleted removed.
1502
        :param contents: List of contents to parse
1503
        """
1504
        for content in contents[:]:
1505
            if self.content_under_deleted(content) or self.content_under_archived(content):
1506
                contents.remove(content)
1507
        return contents
1508
1509
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1510
    def content_under_deleted(self, content: Content) -> bool:
1511
        if content.parent:
1512
            if content.parent.is_deleted:
1513
                return True
1514
            if content.parent.parent:
1515
                return self.content_under_deleted(content.parent)
1516
        return False
1517
1518
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1519
    def content_under_archived(self, content: Content) -> bool:
1520
        if content.parent:
1521
            if content.parent.is_archived:
1522
                return True
1523
            if content.parent.parent:
1524
                return self.content_under_archived(content.parent)
1525
        return False
1526
1527
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1528
    def find_one_by_unique_property(
1529
            self,
1530
            property_name: str,
1531
            property_value: str,
1532
            workspace: Workspace=None,
1533
    ) -> Content:
1534
        """
1535
        Return Content who contains given property.
1536
        Raise sqlalchemy.orm.exc.MultipleResultsFound if more than one Content
1537
        contains this property value.
1538
        :param property_name: Name of property
1539
        :param property_value: Value of property
1540
        :param workspace: Workspace who contains Content
1541
        :return: Found Content
1542
        """
1543
        # TODO - 20160602 - Bastien: Should be JSON type query
1544
        # see https://www.compose.io/articles/using-json-extensions-in-\
1545
        # postgresql-from-python-2/
1546
        query = self._base_query(workspace=workspace).filter(
1547
            Content._properties.like(
1548
                '%"{property_name}": "{property_value}"%'.format(
1549
                    property_name=property_name,
1550
                    property_value=property_value,
1551
                )
1552
            )
1553
        )
1554
        return query.one()
1555
1556
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1557
    def generate_folder_label(
1558
            self,
1559
            workspace: Workspace,
1560
            parent: Content=None,
1561
    ) -> str:
1562
        """
1563
        Generate a folder label
1564
        :param workspace: Future folder workspace
1565
        :param parent: Parent of foture folder (can be None)
1566
        :return: Generated folder name
1567
        """
1568
        query = self._base_query(workspace=workspace)\
1569
            .filter(Content.label.ilike('{0}%'.format(
1570
                _('New folder'),
1571
            )))
1572
        if parent:
1573
            query = query.filter(Content.parent == parent)
1574
1575
        return _('New folder {0}').format(
1576
            query.count() + 1,
1577
        )
1578