Passed
Pull Request — develop (#31)
by inkhey
02:43
created

Api.create()   F

Complexity

Conditions 18

Size

Total Lines 67
Code Lines 43

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 43
dl 0
loc 67
rs 1.2
c 0
b 0
f 0
cc 18
nop 9

How to fix   Long Method    Complexity    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like tracim_backend.lib.core.content.ContentApi.create() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
# -*- coding: utf-8 -*-
2
from contextlib import contextmanager
3
import os
4
import datetime
5
import re
6
import typing
7
from operator import itemgetter
8
9
import transaction
10
from preview_generator.manager import PreviewManager
11
from sqlalchemy import func
12
from sqlalchemy.orm import Query
13
from depot.manager import DepotManager
14
from depot.io.utils import FileIntent
15
import sqlalchemy
16
from sqlalchemy.orm import aliased
17
from sqlalchemy.orm import joinedload
18
from sqlalchemy.orm.attributes import get_history
19
from sqlalchemy.orm.exc import NoResultFound
20
from sqlalchemy.orm.session import Session
21
from sqlalchemy import desc
22
from sqlalchemy import distinct
23
from sqlalchemy import or_
24
from sqlalchemy.sql.elements import and_
25
26
from tracim_backend.lib.utils.utils import cmp_to_key
27
from tracim_backend.lib.core.notifications import NotifierFactory
28
from tracim_backend.exceptions import SameValueError
29
from tracim_backend.exceptions import UnallowedSubContent
30
from tracim_backend.exceptions import ContentTypeNotExist
31
from tracim_backend.exceptions import PageOfPreviewNotFound
32
from tracim_backend.exceptions import PreviewDimNotAllowed
33
from tracim_backend.exceptions import RevisionDoesNotMatchThisContent
34
from tracim_backend.exceptions import EmptyCommentContentNotAllowed
35
from tracim_backend.exceptions import EmptyLabelNotAllowed
36
from tracim_backend.exceptions import ContentNotFound
37
from tracim_backend.exceptions import WorkspacesDoNotMatch
38
from tracim_backend.lib.utils.utils import current_date_for_filename
39
from tracim_backend.app_models.contents import CONTENT_STATUS
40
from tracim_backend.app_models.contents import ContentType
41
from tracim_backend.app_models.contents import CONTENT_TYPES
42
from tracim_backend.models.revision_protection import new_revision
43
from tracim_backend.models.auth import User
44
from tracim_backend.models.data import ActionDescription
45
from tracim_backend.models.data import ContentRevisionRO
46
from tracim_backend.models.data import Content
47
48
from tracim_backend.models.data import NodeTreeItem
49
from tracim_backend.models.data import RevisionReadStatus
50
from tracim_backend.models.data import UserRoleInWorkspace
51
from tracim_backend.models.data import Workspace
52
from tracim_backend.lib.utils.translation import Translator
53
from tracim_backend.lib.utils.translation import DEFAULT_FALLBACK_LANG
54
from tracim_backend.models.context_models import RevisionInContext
55
from tracim_backend.models.context_models import PreviewAllowedDim
56
from tracim_backend.models.context_models import ContentInContext
57
58
__author__ = 'damien'
59
60
61
# TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
62
def compare_content_for_sorting_by_type_and_name(
63
        content1: Content,
64
        content2: Content
65
) -> int:
66
    """
67
    :param content1:
68
    :param content2:
69
    :return:    1 if content1 > content2
70
                -1 if content1 < content2
71
                0 if content1 = content2
72
    """
73
74
    if content1.type == content2.type:
75
        if content1.get_label().lower()>content2.get_label().lower():
76
            return 1
77
        elif content1.get_label().lower()<content2.get_label().lower():
78
            return -1
79
        return 0
80
    else:
81
        # TODO - D.A. - 2014-12-02 - Manage Content Types Dynamically
82
        content_type_order = [
83
            CONTENT_TYPES.Folder.slug,
84
            CONTENT_TYPES.Page.slug,
85
            CONTENT_TYPES.Thread.slug,
86
            CONTENT_TYPES.File.slug,
87
        ]
88
89
        content_1_type_index = content_type_order.index(content1.type)
90
        content_2_type_index = content_type_order.index(content2.type)
91
        result = content_1_type_index - content_2_type_index
92
93
        if result < 0:
94
            return -1
95
        elif result > 0:
96
            return 1
97
        else:
98
            return 0
99
100
# TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
101
def compare_tree_items_for_sorting_by_type_and_name(
102
        item1: NodeTreeItem,
103
        item2: NodeTreeItem
104
) -> int:
105
    return compare_content_for_sorting_by_type_and_name(item1.node, item2.node)
106
107
108
class ContentApi(object):
109
110
    SEARCH_SEPARATORS = ',| '
111
    SEARCH_DEFAULT_RESULT_NB = 50
112
113
    # DISPLAYABLE_CONTENTS = (
114
    #     CONTENT_TYPES.Folder.slug,
115
    #     CONTENT_TYPES.File.slug,
116
    #     CONTENT_TYPES.Comment.slug,
117
    #     CONTENT_TYPES.Thread.slug,
118
    #     CONTENT_TYPES.Page.slug,
119
    #     CONTENT_TYPES.Page.slugLegacy,
120
    #     ContentType.MarkdownPage,
121
    # )
122
123
    def __init__(
124
            self,
125
            session: Session,
126
            current_user: typing.Optional[User],
127
            config,
128
            show_archived: bool = False,
129
            show_deleted: bool = False,
130
            show_temporary: bool = False,
131
            show_active: bool = True,
132
            all_content_in_treeview: bool = True,
133
            force_show_all_types: bool = False,
134
            disable_user_workspaces_filter: bool = False,
135
    ) -> None:
136
        self._session = session
137
        self._user = current_user
138
        self._config = config
139
        self._user_id = current_user.user_id if current_user else None
140
        self._show_archived = show_archived
141
        self._show_deleted = show_deleted
142
        self._show_temporary = show_temporary
143
        self._show_active = show_active
144
        self._show_all_type_of_contents_in_treeview = all_content_in_treeview
145
        self._force_show_all_types = force_show_all_types
146
        self._disable_user_workspaces_filter = disable_user_workspaces_filter
147
        self.preview_manager = PreviewManager(self._config.PREVIEW_CACHE_DIR, create_folder=True)  # nopep8
148
        default_lang = None
149
        if self._user:
150
            default_lang = self._user.lang
151
        if not default_lang:
152
            default_lang = DEFAULT_FALLBACK_LANG
153
        self.translator = Translator(app_config=self._config, default_lang=default_lang)  # nopep8
154
155
    @contextmanager
156
    def show(
157
            self,
158
            show_archived: bool=False,
159
            show_deleted: bool=False,
160
            show_temporary: bool=False,
161
    ) -> typing.Generator['ContentApi', None, None]:
162
        """
163
        Use this method as context manager to update show_archived,
164
        show_deleted and show_temporary properties during context.
165
        :param show_archived: show archived contents
166
        :param show_deleted:  show deleted contents
167
        :param show_temporary:  show temporary contents
168
        """
169
        previous_show_archived = self._show_archived
170
        previous_show_deleted = self._show_deleted
171
        previous_show_temporary = self._show_temporary
172
173
        try:
174
            self._show_archived = show_archived
175
            self._show_deleted = show_deleted
176
            self._show_temporary = show_temporary
177
            yield self
178
        finally:
179
            self._show_archived = previous_show_archived
180
            self._show_deleted = previous_show_deleted
181
            self._show_temporary = previous_show_temporary
182
183
    def get_content_in_context(self, content: Content) -> ContentInContext:
184
        return ContentInContext(content, self._session, self._config, self._user)  # nopep8
185
186
    def get_revision_in_context(self, revision: ContentRevisionRO) -> RevisionInContext:  # nopep8
187
        # TODO - G.M - 2018-06-173 - create revision in context object
188
        return RevisionInContext(revision, self._session, self._config, self._user) # nopep8
189
    
190
    def _get_revision_join(self) -> sqlalchemy.sql.elements.BooleanClauseList:
191
        """
192
        Return the Content/ContentRevision query join condition
193
        :return: Content/ContentRevision query join condition
194
        """
195
        return and_(Content.id == ContentRevisionRO.content_id,
196
                    ContentRevisionRO.revision_id == self._session.query(
197
                        ContentRevisionRO.revision_id)
198
                    .filter(ContentRevisionRO.content_id == Content.id)
199
                    .order_by(ContentRevisionRO.revision_id.desc())
200
                    .limit(1)
201
                    .correlate(Content))
202
203
    def get_canonical_query(self) -> Query:
204
        """
205
        Return the Content/ContentRevision base query who join these table on the last revision.
206
        :return: Content/ContentRevision Query
207
        """
208
        return self._session.query(Content)\
209
            .join(ContentRevisionRO, self._get_revision_join())
210
211
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
212
    @classmethod
213
    def sort_tree_items(
214
        cls,
215
        content_list: typing.List[NodeTreeItem],
216
    )-> typing.List[NodeTreeItem]:
217
        news = []
218
        for item in content_list:
219
            news.append(item)
220
221
        content_list.sort(key=cmp_to_key(
222
            compare_tree_items_for_sorting_by_type_and_name,
223
        ))
224
225
        return content_list
226
227
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
228
    @classmethod
229
    def sort_content(
230
        cls,
231
        content_list: typing.List[Content],
232
    ) -> typing.List[Content]:
233
        content_list.sort(key=cmp_to_key(compare_content_for_sorting_by_type_and_name))
234
        return content_list
235
236
    def __real_base_query(
237
        self,
238
        workspace: Workspace = None,
239
    ) -> Query:
240
        result = self.get_canonical_query()
241
242
        # Exclude non displayable types
243
        if not self._force_show_all_types:
244
            result = result.filter(Content.type.in_(CONTENT_TYPES.query_allowed_types_slugs()))
245
246
        if workspace:
247
            result = result.filter(Content.workspace_id == workspace.workspace_id)
248
249
        # Security layer: if user provided, filter
250
        # with user workspaces privileges
251
        if self._user and not self._disable_user_workspaces_filter:
252
            user = self._session.query(User).get(self._user_id)
253
            # Filter according to user workspaces
254
            workspace_ids = [r.workspace_id for r in user.roles \
255
                             if r.role>=UserRoleInWorkspace.READER]
256
            result = result.filter(or_(
257
                Content.workspace_id.in_(workspace_ids),
258
                # And allow access to non workspace document when he is owner
259
                and_(
260
                    Content.workspace_id == None,
261
                    Content.owner_id == self._user_id,
262
                )
263
            ))
264
265
        return result
266
267
    def _base_query(self, workspace: Workspace=None) -> Query:
268
        result = self.__real_base_query(workspace)
269
270
        if not self._show_active:
271
            result = result.filter(or_(
272
                Content.is_deleted==True,
273
                Content.is_archived==True,
274
            ))
275
        if not self._show_deleted:
276
            result = result.filter(Content.is_deleted==False)
277
278
        if not self._show_archived:
279
            result = result.filter(Content.is_archived==False)
280
281
        if not self._show_temporary:
282
            result = result.filter(Content.is_temporary==False)
283
284
        return result
285
286
    def __revisions_real_base_query(
287
        self,
288
        workspace: Workspace=None,
289
    ) -> Query:
290
        result = self._session.query(ContentRevisionRO)
291
292
        # Exclude non displayable types
293
        if not self._force_show_all_types:
294
            result = result.filter(Content.type.in_(self.DISPLAYABLE_CONTENTS))
295
296
        if workspace:
297
            result = result.filter(ContentRevisionRO.workspace_id==workspace.workspace_id)
298
299
        if self._user:
300
            user = self._session.query(User).get(self._user_id)
301
            # Filter according to user workspaces
302
            workspace_ids = [r.workspace_id for r in user.roles \
303
                             if r.role>=UserRoleInWorkspace.READER]
304
            result = result.filter(ContentRevisionRO.workspace_id.in_(workspace_ids))
305
306
        return result
307
308
    def _revisions_base_query(
309
        self,
310
        workspace: Workspace=None,
311
    ) -> Query:
312
        result = self.__revisions_real_base_query(workspace)
313
314
        if not self._show_deleted:
315
            result = result.filter(ContentRevisionRO.is_deleted==False)
316
317
        if not self._show_archived:
318
            result = result.filter(ContentRevisionRO.is_archived==False)
319
320
        if not self._show_temporary:
321
            result = result.filter(Content.is_temporary==False)
322
323
        return result
324
325
    def _hard_filtered_base_query(
326
        self,
327
        workspace: Workspace=None,
328
    ) -> Query:
329
        """
330
        If set to True, then filterign on is_deleted and is_archived will also
331
        filter parent properties. This is required for search() function which
332
        also search in comments (for example) which may be 'not deleted' while
333
        the associated content is deleted
334
335
        :param hard_filtering:
336
        :return:
337
        """
338
        result = self.__real_base_query(workspace)
339
340
        if not self._show_deleted:
341
            parent = aliased(Content)
342
            result = result.join(parent, Content.parent).\
343
                filter(Content.is_deleted==False).\
344
                filter(parent.is_deleted==False)
345
346
        if not self._show_archived:
347
            parent = aliased(Content)
348
            result = result.join(parent, Content.parent).\
349
                filter(Content.is_archived==False).\
350
                filter(parent.is_archived==False)
351
352
        if not self._show_temporary:
353
            parent = aliased(Content)
354
            result = result.join(parent, Content.parent). \
355
                filter(Content.is_temporary == False). \
356
                filter(parent.is_temporary == False)
357
358
        return result
359
360
    def get_base_query(
361
        self,
362
        workspace: Workspace,
363
    ) -> Query:
364
        return self._base_query(workspace)
365
366
    # TODO - G.M - 2018-07-17 - [Cleanup] Drop this method if unneeded
367
    # def get_child_folders(self, parent: Content=None, workspace: Workspace=None, filter_by_allowed_content_types: list=[], removed_item_ids: list=[], allowed_node_types=None) -> typing.List[Content]:
368
    #     """
369
    #     This method returns child items (folders or items) for left bar treeview.
370
    # 
371
    #     :param parent:
372
    #     :param workspace:
373
    #     :param filter_by_allowed_content_types:
374
    #     :param removed_item_ids:
375
    #     :param allowed_node_types: This parameter allow to hide folders for which the given type of content is not allowed.
376
    #            For example, if you want to move a Page from a folder to another, you should show only folders that accept pages
377
    #     :return:
378
    #     """
379
    #     filter_by_allowed_content_types = filter_by_allowed_content_types or []  # FDV
380
    #     removed_item_ids = removed_item_ids or []  # FDV
381
    # 
382
    #     if not allowed_node_types:
383
    #         allowed_node_types = [CONTENT_TYPES.Folder.slug]
384
    #     elif allowed_node_types==CONTENT_TYPES.Any_SLUG:
385
    #         allowed_node_types = ContentType.all()
386
    # 
387
    #     parent_id = parent.content_id if parent else None
388
    #     folders = self._base_query(workspace).\
389
    #         filter(Content.parent_id==parent_id).\
390
    #         filter(Content.type.in_(allowed_node_types)).\
391
    #         filter(Content.content_id.notin_(removed_item_ids)).\
392
    #         all()
393
    # 
394
    #     if not filter_by_allowed_content_types or \
395
    #                     len(filter_by_allowed_content_types)<=0:
396
    #         # Standard case for the left treeview: we want to show all contents
397
    #         # in the left treeview... so we still filter because for example
398
    #         # comments must not appear in the treeview
399
    #         return [folder for folder in folders \
400
    #                 if folder.type in ContentType.allowed_types_for_folding()]
401
    # 
402
    #     # Now this is a case of Folders only (used for moving content)
403
    #     # When moving a content, you must get only folders that allow to be filled
404
    #     # with the type of content you want to move
405
    #     result = []
406
    #     for folder in folders:
407
    #         for allowed_content_type in filter_by_allowed_content_types:
408
    # 
409
    #             is_folder = folder.type == CONTENT_TYPES.Folder.slug
410
    #             content_type__allowed = folder.properties['allowed_content'][allowed_content_type] == True
411
    # 
412
    #             if is_folder and content_type__allowed:
413
    #                 result.append(folder)
414
    #                 break
415
    # 
416
    #     return result
417
418
    def create(self, content_type_slug: str, workspace: Workspace, parent: Content=None, label: str = '', filename: str = '', do_save=False, is_temporary: bool=False, do_notify=True) -> Content:
419
        # TODO - G.M - 2018-07-16 - raise Exception instead of assert
420
        assert content_type_slug != CONTENT_TYPES.Any_SLUG
421
        assert not (label and filename)
422
423
        if content_type_slug == CONTENT_TYPES.Folder.slug and not label:
424
            label = self.generate_folder_label(workspace, parent)
425
426
        # TODO BS 2018-08-13: Despite that workspace is required, create_comment
427
        # can call here with None. Must update create_comment tu require the
428
        # workspace.
429
        if not workspace:
430
            workspace = parent.workspace
431
432
        content_type = CONTENT_TYPES.get_one_by_slug(content_type_slug)
433
        if parent and parent.properties and 'allowed_content' in parent.properties:
434
            if content_type.slug not in parent.properties['allowed_content'] or not parent.properties['allowed_content'][content_type.slug]:
435
                raise UnallowedSubContent(' SubContent of type {subcontent_type}  not allowed in content {content_id}'.format(  # nopep8
436
                    subcontent_type=content_type.slug,
437
                    content_id=parent.content_id,
438
                ))
439
        if not workspace and parent:
440
            workspace = parent.workspace
441
442
        if workspace:
443
            if content_type.slug not in workspace.get_allowed_content_types():
444
                raise UnallowedSubContent(
445
                    ' SubContent of type {subcontent_type}  not allowed in workspace {content_id}'.format(  # nopep8
446
                        subcontent_type=content_type.slug,
447
                        content_id=workspace.workspace_id,
448
                    )
449
                )
450
451
        content = Content()
452
453
        if filename:
454
            # INFO - G.M - 2018-07-04 - File_name setting automatically
455
            # set label and file_extension
456
            content.file_name = label
457
        elif label:
458
            content.label = label
459
        else:
460
            if content_type_slug == CONTENT_TYPES.Comment.slug:
461
                # INFO - G.M - 2018-07-16 - Default label for comments is
462
                # empty string.
463
                content.label = ''
464
            else:
465
                raise EmptyLabelNotAllowed('Content of this type should have a valid label')  # nopep8
466
467
        content.owner = self._user
468
        content.parent = parent
469
470
        content.workspace = workspace
471
        content.type = content_type.slug
472
        content.is_temporary = is_temporary
473
        content.revision_type = ActionDescription.CREATION
474
475
        if content.type in (
476
                CONTENT_TYPES.Page.slug,
477
                CONTENT_TYPES.Thread.slug,
478
        ):
479
            content.file_extension = '.html'
480
481
        if do_save:
482
            self._session.add(content)
483
            self.save(content, ActionDescription.CREATION, do_notify=do_notify)
484
        return content
485
486
    def create_comment(self, workspace: Workspace=None, parent: Content=None, content:str ='', do_save=False) -> Content:
487
        # TODO: check parent allowed_type and workspace allowed_ type
488
        assert parent and parent.type != CONTENT_TYPES.Folder.slug
489
        if not content:
490
            raise EmptyCommentContentNotAllowed()
491
492
        item = self.create(
493
            content_type_slug=CONTENT_TYPES.Comment.slug,
494
            workspace=workspace,
495
            parent=parent,
496
            do_notify=False,
497
            do_save=False,
498
            label='',
499
        )
500
        item.description = content
501
        item.revision_type = ActionDescription.COMMENT
502
503
        if do_save:
504
            self.save(item, ActionDescription.COMMENT)
505
        return item
506
507
    def get_one_from_revision(self, content_id: int, content_type: str, workspace: Workspace=None, revision_id=None) -> Content:
508
        """
509
        This method is a hack to convert a node revision item into a node
510
        :param content_id:
511
        :param content_type:
512
        :param workspace:
513
        :param revision_id:
514
        :return:
515
        """
516
517
        content = self.get_one(content_id, content_type, workspace)
518
        revision = self._session.query(ContentRevisionRO).filter(ContentRevisionRO.revision_id==revision_id).one()
519
520
        if revision.content_id==content.content_id:
521
            content.revision_to_serialize = revision.revision_id
522
        else:
523
            raise ValueError('Revision not found for given content')
524
525
        return content
526
527
    def get_one(self, content_id: int, content_type: str, workspace: Workspace=None, parent: Content=None) -> Content:
528
529
        if not content_id:
530
            return None
531
532
        base_request = self._base_query(workspace).filter(Content.content_id==content_id)
533
534
        if content_type!=CONTENT_TYPES.Any_SLUG:
535
            base_request = base_request.filter(Content.type==content_type)
536
537
        if parent:
538
            base_request = base_request.filter(Content.parent_id==parent.content_id)  # nopep8
539
540
        try:
541
            content = base_request.one()
542
        except NoResultFound as exc:
543
            # TODO - G.M - 2018-07-16 - Add better support for all different
544
            # error case who can happened here
545
            # like content doesn't exist, wrong parent, wrong content_type, wrong workspace,
546
            # wrong access to this workspace, wrong base filter according
547
            # to content_status.
548
            raise ContentNotFound('Content "{}" not found in database'.format(content_id)) from exc  # nopep8
549
        return content
550
551
    def get_one_revision(self, revision_id: int = None, content: Content= None) -> ContentRevisionRO:  # nopep8
552
        """
553
        This method allow us to get directly any revision with its id
554
        :param revision_id: The content's revision's id that we want to return
555
        :param content: The content related to the revision, if None do not
556
        check if revision is related to this content.
557
        :return: An item Content linked with the correct revision
558
        """
559
        assert revision_id is not None# DYN_REMOVE
560
561
        revision = self._session.query(ContentRevisionRO).filter(ContentRevisionRO.revision_id == revision_id).one()
562
        if content and revision.content_id != content.content_id:
563
            raise RevisionDoesNotMatchThisContent(
564
                'revision {revision_id} is not a revision of content {content_id}'.format(  # nopep8
565
                    revision_id=revision.revision_id,
566
                    content_id=content.content_id,
567
                    )
568
            )
569
        return revision
570
571
    # INFO - A.P - 2017-07-03 - python file object getter
572
    # in case of we cook a version of preview manager that allows a pythonic
573
    # access to files
574
    # def get_one_revision_file(self, revision_id: int = None):
575
    #     """
576
    #     This function allows us to directly get a Python file object from its
577
    #     revision identifier.
578
    #     :param revision_id: The revision id of the file we want to return
579
    #     :return: The corresponding Python file object
580
    #     """
581
    #     revision = self.get_one_revision(revision_id)
582
    #     return DepotManager.get().get(revision.depot_file)
583
584
    def get_one_revision_filepath(self, revision_id: int = None) -> str:
585
        """
586
        This method allows us to directly get a file path from its revision
587
        identifier.
588
        :param revision_id: The revision id of the filepath we want to return
589
        :return: The corresponding filepath
590
        """
591
        revision = self.get_one_revision(revision_id)
592
        depot = DepotManager.get()
593
        depot_stored_file = depot.get(revision.depot_file)  # type: StoredFile
594
        depot_file_path = depot_stored_file._file_path  # type: str
595
        return depot_file_path
596
597
    def get_one_by_label_and_parent(
598
            self,
599
            content_label: str,
600
            content_parent: Content=None,
601
    ) -> Content:
602
        """
603
        This method let us request the database to obtain a Content with its name and parent
604
        :param content_label: Either the content's label or the content's filename if the label is None
605
        :param content_parent: The parent's content
606
        :param workspace: The workspace's content
607
        :return The corresponding Content
608
        """
609
        workspace = content_parent.workspace if content_parent else None
610
        query = self._base_query(workspace)
611
        parent_id = content_parent.content_id if content_parent else None
612
        query = query.filter(Content.parent_id == parent_id)
613
614
        file_name, file_extension = os.path.splitext(content_label)
615
616
        return query.filter(
617
            or_(
618
                and_(
619
                    Content.type == CONTENT_TYPES.File.slug,
620
                    Content.label == file_name,
621
                    Content.file_extension == file_extension,
622
                ),
623
                and_(
624
                    Content.type == CONTENT_TYPES.Thread.slug,
625
                    Content.label == file_name,
626
                ),
627
                and_(
628
                    Content.type == CONTENT_TYPES.Page.slug,
629
                    Content.label == file_name,
630
                ),
631
                and_(
632
                    Content.type == CONTENT_TYPES.Folder.slug,
633
                    Content.label == content_label,
634
                ),
635
            )
636
        ).one()
637
638
    def get_one_by_label_and_parent_labels(
639
            self,
640
            content_label: str,
641
            workspace: Workspace,
642
            content_parent_labels: [str]=None,
643
    ):
644
        """
645
        Return content with it's label, workspace and parents labels (optional)
646
        :param content_label: label of content (label or file_name)
647
        :param workspace: workspace containing all of this
648
        :param content_parent_labels: Ordered list of labels representing path
649
            of folder (without workspace label).
650
        E.g.: ['foo', 'bar'] for complete path /Workspace1/foo/bar folder
651
        :return: Found Content
652
        """
653
        query = self._base_query(workspace)
654
        parent_folder = None
655
656
        # Grab content parent folder if parent path given
657
        if content_parent_labels:
658
            parent_folder = self.get_folder_with_workspace_path_labels(
659
                content_parent_labels,
660
                workspace,
661
            )
662
663
        # Build query for found content by label
664
        content_query = self.filter_query_for_content_label_as_path(
665
            query=query,
666
            content_label_as_file=content_label,
667
        )
668
669
        # Modify query to apply parent folder filter if any
670
        if parent_folder:
671
            content_query = content_query.filter(
672
                Content.parent_id == parent_folder.content_id,
673
            )
674
        else:
675
            content_query = content_query.filter(
676
                Content.parent_id == None,
677
            )
678
679
        # Filter with workspace
680
        content_query = content_query.filter(
681
            Content.workspace_id == workspace.workspace_id,
682
        )
683
684
        # Return the content
685
        return content_query\
686
            .order_by(
687
                Content.revision_id.desc(),
688
            )\
689
            .one()
690
691
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
692
    def get_folder_with_workspace_path_labels(
693
            self,
694
            path_labels: [str],
695
            workspace: Workspace,
696
    ) -> Content:
697
        """
698
        Return a Content folder for given relative path.
699
        TODO BS 20161124: Not safe if web interface allow folder duplicate names
700
        :param path_labels: List of labels representing path of folder
701
        (without workspace label).
702
        E.g.: ['foo', 'bar'] for complete path /Workspace1/foo/bar folder
703
        :param workspace: workspace of folders
704
        :return: Content folder
705
        """
706
        query = self._base_query(workspace)
707
        folder = None
708
709
        for label in path_labels:
710
            # Filter query on label
711
            folder_query = query \
712
                .filter(
713
                    Content.type == CONTENT_TYPES.Folder.slug,
714
                    Content.label == label,
715
                    Content.workspace_id == workspace.workspace_id,
716
                )
717
718
            # Search into parent folder (if already deep)
719
            if folder:
720
                folder_query = folder_query\
721
                    .filter(
722
                        Content.parent_id == folder.content_id,
723
                    )
724
            else:
725
                folder_query = folder_query \
726
                    .filter(Content.parent_id == None)
727
728
            # Get thirst corresponding folder
729
            folder = folder_query \
730
                .order_by(Content.revision_id.desc()) \
731
                .one()
732
733
        return folder
734
735
    def filter_query_for_content_label_as_path(
736
            self,
737
            query: Query,
738
            content_label_as_file: str,
739
            is_case_sensitive: bool = False,
740
    ) -> Query:
741
        """
742
        Apply normalised filters to found Content corresponding as given label.
743
        :param query: query to modify
744
        :param content_label_as_file: label in this
745
        FILE version, use Content.get_label_as_file().
746
        :param is_case_sensitive: Take care about case or not
747
        :return: modified query
748
        """
749
        file_name, file_extension = os.path.splitext(content_label_as_file)
750
751
        label_filter = Content.label == content_label_as_file
752
        file_name_filter = Content.label == file_name
753
        file_extension_filter = Content.file_extension == file_extension
754
755
        if not is_case_sensitive:
756
            label_filter = func.lower(Content.label) == \
757
                           func.lower(content_label_as_file)
758
            file_name_filter = func.lower(Content.label) == \
759
                               func.lower(file_name)
760
            file_extension_filter = func.lower(Content.file_extension) == \
761
                                    func.lower(file_extension)
762
763
        return query.filter(or_(
764
            and_(
765
                Content.type == CONTENT_TYPES.File.slug,
766
                file_name_filter,
767
                file_extension_filter,
768
            ),
769
            and_(
770
                Content.type == CONTENT_TYPES.Thread.slug,
771
                file_name_filter,
772
                file_extension_filter,
773
            ),
774
            and_(
775
                Content.type == CONTENT_TYPES.Page.slug,
776
                file_name_filter,
777
                file_extension_filter,
778
            ),
779
            and_(
780
                Content.type == CONTENT_TYPES.Folder.slug,
781
                label_filter,
782
            ),
783
        ))
784
785
786
    def get_pdf_preview_path(
787
            self,
788
            content_id: int,
789
            revision_id: int,
790
            page: int
791
    ) -> str:
792
        """
793
        Get pdf preview of revision of content
794
        :param content_id: id of content
795
        :param revision_id: id of content revision
796
        :param page: page number of the preview, useful for multipage content
797
        :return: preview_path as string
798
        """
799
        file_path = self.get_one_revision_filepath(revision_id)
800
        if page >= self.preview_manager.get_page_nb(file_path):
801
            raise PageOfPreviewNotFound(
802
                'page {page} of content {content_id} does not exist'.format(
803
                    page=page,
804
                    content_id=content_id
805
                ),
806
            )
807
        jpg_preview_path = self.preview_manager.get_pdf_preview(
808
            file_path,
809
            page=page
810
        )
811
        return jpg_preview_path
812
813
    def get_full_pdf_preview_path(self, revision_id: int) -> str:
814
        """
815
        Get full(multiple page) pdf preview of revision of content
816
        :param revision_id: id of revision
817
        :return: path of the full pdf preview of this revision
818
        """
819
        file_path = self.get_one_revision_filepath(revision_id)
820
        pdf_preview_path = self.preview_manager.get_pdf_preview(file_path)
821
        return pdf_preview_path
822
823
    def get_jpg_preview_allowed_dim(self) -> PreviewAllowedDim:
824
        """
825
        Get jpg preview allowed dimensions and strict bool param.
826
        """
827
        return PreviewAllowedDim(
828
            self._config.PREVIEW_JPG_RESTRICTED_DIMS,
829
            self._config.PREVIEW_JPG_ALLOWED_DIMS,
830
        )
831
832
    def get_jpg_preview_path(
833
        self,
834
        content_id: int,
835
        revision_id: int,
836
        page: int,
837
        width: int = None,
838
        height: int = None,
839
    ) -> str:
840
        """
841
        Get jpg preview of revision of content
842
        :param content_id: id of content
843
        :param revision_id: id of content revision
844
        :param page: page number of the preview, useful for multipage content
845
        :param width: width in pixel
846
        :param height: height in pixel
847
        :return: preview_path as string
848
        """
849
        file_path = self.get_one_revision_filepath(revision_id)
850
        if page >= self.preview_manager.get_page_nb(file_path):
851
            raise Exception(
852
                'page {page} of revision {revision_id} of content {content_id} does not exist'.format(  # nopep8
853
                    page=page,
854
                    revision_id=revision_id,
855
                    content_id=content_id,
856
                ),
857
            )
858
        if not width and not height:
859
            width = self._config.PREVIEW_JPG_ALLOWED_DIMS[0].width
860
            height = self._config.PREVIEW_JPG_ALLOWED_DIMS[0].height
861
862
        allowed_dim = False
863
        for preview_dim in self._config.PREVIEW_JPG_ALLOWED_DIMS:
864
            if width == preview_dim.width and height == preview_dim.height:
865
                allowed_dim = True
866
                break
867
868
        if not allowed_dim and self._config.PREVIEW_JPG_RESTRICTED_DIMS:
869
            raise PreviewDimNotAllowed(
870
                'Size {width}x{height} is not allowed for jpeg preview'.format(
871
                    width=width,
872
                    height=height,
873
                )
874
            )
875
        jpg_preview_path = self.preview_manager.get_jpeg_preview(
876
            file_path,
877
            page=page,
878
            width=width,
879
            height=height,
880
        )
881
        return jpg_preview_path
882
883
    def _get_all_query(
884
        self,
885
        parent_id: int = None,
886
        content_type_slug: str = CONTENT_TYPES.Any_SLUG,
887
        workspace: Workspace = None
888
    ) -> Query:
889
        """
890
        Extended filter for better "get all data" query
891
        :param parent_id: filter by parent_id
892
        :param content_type_slug: filter by content_type slug
893
        :param workspace: filter by workspace
894
        :return:
895
        """
896
        assert parent_id is None or isinstance(parent_id, int)
897
        assert content_type_slug is not None
898
        resultset = self._base_query(workspace)
899
900
        if content_type_slug != CONTENT_TYPES.Any_SLUG:
901
            # INFO - G.M - 2018-07-05 - convert with
902
            #  content type object to support legacy slug
903
            content_type_object = CONTENT_TYPES.get_one_by_slug(content_type_slug)
904
            all_slug_alias = [content_type_object.slug]
905
            if content_type_object.slug_alias:
906
                all_slug_alias.extend(content_type_object.slug_alias)
907
            resultset = resultset.filter(Content.type.in_(all_slug_alias))
908
909
        if parent_id:
910
            resultset = resultset.filter(Content.parent_id==parent_id)
911
        if parent_id == 0 or parent_id is False:
912
            resultset = resultset.filter(Content.parent_id == None)
913
914
        return resultset
915
916
    def get_all(self, parent_id: int=None, content_type: str=CONTENT_TYPES.Any_SLUG, workspace: Workspace=None) -> typing.List[Content]:
917
        return self._get_all_query(parent_id, content_type, workspace).all()
918
919
    # TODO - G.M - 2018-07-17 - [Cleanup] Drop this method if unneeded
920
    # def get_children(self, parent_id: int, content_types: list, workspace: Workspace=None) -> typing.List[Content]:
921
    #     """
922
    #     Return parent_id childs of given content_types
923
    #     :param parent_id: parent id
924
    #     :param content_types: list of types
925
    #     :param workspace: workspace filter
926
    #     :return: list of content
927
    #     """
928
    #     resultset = self._base_query(workspace)
929
    #     resultset = resultset.filter(Content.type.in_(content_types))
930
    #
931
    #     if parent_id:
932
    #         resultset = resultset.filter(Content.parent_id==parent_id)
933
    #     if parent_id is False:
934
    #         resultset = resultset.filter(Content.parent_id == None)
935
    #
936
    #     return resultset.all()
937
938
    # TODO - G.M - 2018-07-17 - [Cleanup] Drop this method if unneeded
939
    # TODO find an other name to filter on is_deleted / is_archived
940
    def get_all_with_filter(self, parent_id: int=None, content_type: str=CONTENT_TYPES.Any_SLUG, workspace: Workspace=None) -> typing.List[Content]:
941
        assert parent_id is None or isinstance(parent_id, int) # DYN_REMOVE
942
        assert content_type is not None# DYN_REMOVE
943
        assert isinstance(content_type, str) # DYN_REMOVE
944
945
        resultset = self._base_query(workspace)
946
947
        if content_type != CONTENT_TYPES.Any_SLUG:
948
            resultset = resultset.filter(Content.type==content_type)
949
950
        resultset = resultset.filter(Content.is_deleted == self._show_deleted)
951
        resultset = resultset.filter(Content.is_archived == self._show_archived)
952
        resultset = resultset.filter(Content.is_temporary == self._show_temporary)
953
954
        resultset = resultset.filter(Content.parent_id==parent_id)
955
956
        return resultset.all()
957
958
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
959
    def get_all_without_exception(self, content_type: str, workspace: Workspace=None) -> typing.List[Content]:
960
        assert content_type is not None# DYN_REMOVE
961
962
        resultset = self._base_query(workspace)
963
964
        if content_type != CONTENT_TYPES.Any_SLUG:
965
            resultset = resultset.filter(Content.type==content_type)
966
967
        return resultset.all()
968
969
    def get_last_unread(self, parent_id: int, content_type: str,
970
                        workspace: Workspace=None, limit=10) -> typing.List[Content]:
971
        assert parent_id is None or isinstance(parent_id, int) # DYN_REMOVE
972
        assert content_type is not None# DYN_REMOVE
973
        assert isinstance(content_type, str) # DYN_REMOVE
974
975
        read_revision_ids = self._session.query(RevisionReadStatus.revision_id) \
976
            .filter(RevisionReadStatus.user_id==self._user_id)
977
978
        not_read_revisions = self._revisions_base_query(workspace) \
979
            .filter(~ContentRevisionRO.revision_id.in_(read_revision_ids)) \
980
            .filter(ContentRevisionRO.workspace_id == Workspace.workspace_id) \
981
            .filter(Workspace.is_deleted.is_(False)) \
982
            .subquery()
983
984
    # TODO - G.M - 2018-07-17 - [Cleanup] Drop this method if unneeded
985
    # def get_all_without_exception(self, content_type: str, workspace: Workspace=None) -> typing.List[Content]:
986
    #     assert content_type is not None# DYN_REMOVE
987
    #
988
    #     resultset = self._base_query(workspace)
989
    #
990
    #     if content_type != CONTENT_TYPES.Any_SLUG:
991
    #         resultset = resultset.filter(Content.type==content_type)
992
    #
993
    #     return resultset.all()
994
995
    def get_last_active(
996
            self,
997
            workspace: Workspace=None,
998
            limit: typing.Optional[int]=None,
999
            before_content: typing.Optional[Content]= None,
1000
            content_ids: typing.Optional[typing.List[int]] = None,
1001
    ) -> typing.List[Content]:
1002
        """
1003
        get contents list sorted by last update
1004
        (last modification of content itself or one of this comment)
1005
        :param workspace: Workspace to check
1006
        :param limit: maximum number of elements to return
1007
        :param before_content: last_active content are only those updated
1008
         before this content given.
1009
        :param content_ids: restrict selection to some content ids and
1010
        related Comments
1011
        :return: list of content
1012
        """
1013
1014
        resultset = self._get_all_query(
1015
            workspace=workspace,
1016
        )
1017
        if content_ids:
1018
            resultset = resultset.filter(
1019
                or_(
1020
                    Content.content_id.in_(content_ids),
1021
                    and_(
1022
                        Content.parent_id.in_(content_ids),
1023
                        Content.type == CONTENT_TYPES.Comment.slug
1024
                    )
1025
                )
1026
            )
1027
1028
        resultset = resultset.order_by(desc(Content.updated))
1029
1030
        active_contents = []
1031
        too_recent_content = []
1032
        before_content_find = False
1033
        for content in resultset:
1034
            related_active_content = None
1035
            if CONTENT_TYPES.Comment.slug == content.type:
1036
                related_active_content = content.parent
1037
            else:
1038
                related_active_content = content
1039
1040
            # INFO - G.M - 2018-08-10 - re-apply general filters here to avoid
1041
            # issue with comments
1042
            if not self._show_deleted and related_active_content.is_deleted:
1043
                continue
1044
            if not self._show_archived and related_active_content.is_archived:
1045
                continue
1046
1047
            if related_active_content not in active_contents and related_active_content not in too_recent_content:  # nopep8
1048
1049
                if not before_content or before_content_find:
1050
                    active_contents.append(related_active_content)
1051
                else:
1052
                    too_recent_content.append(related_active_content)
1053
1054
                if before_content and related_active_content == before_content:
1055
                    before_content_find = True
1056
1057
            if limit and len(active_contents) >= limit:
1058
                break
1059
1060
        return active_contents
1061
1062
    # TODO - G.M - 2018-07-19 - Find a way to update this method to something
1063
    # usable and efficient for tracim v2 to get content with read/unread status
1064
    # instead of relying on has_new_information_for()
1065
    # def get_last_unread(self, parent_id: typing.Optional[int], content_type: str,
1066
    #                     workspace: Workspace=None, limit=10) -> typing.List[Content]:
1067
    #     assert parent_id is None or isinstance(parent_id, int) # DYN_REMOVE
1068
    #     assert content_type is not None# DYN_REMOVE
1069
    #     assert isinstance(content_type, str) # DYN_REMOVE
1070
    #
1071
    #     read_revision_ids = self._session.query(RevisionReadStatus.revision_id) \
1072
    #         .filter(RevisionReadStatus.user_id==self._user_id)
1073
    #
1074
    #     not_read_revisions = self._revisions_base_query(workspace) \
1075
    #         .filter(~ContentRevisionRO.revision_id.in_(read_revision_ids)) \
1076
    #         .filter(ContentRevisionRO.workspace_id == Workspace.workspace_id) \
1077
    #         .filter(Workspace.is_deleted.is_(False)) \
1078
    #         .subquery()
1079
    #
1080
    #     not_read_content_ids_query = self._session.query(
1081
    #         distinct(not_read_revisions.c.content_id)
1082
    #     )
1083
    #     not_read_content_ids = list(map(
1084
    #         itemgetter(0),
1085
    #         not_read_content_ids_query,
1086
    #     ))
1087
    #
1088
    #     not_read_contents = self._base_query(workspace) \
1089
    #         .filter(Content.content_id.in_(not_read_content_ids)) \
1090
    #         .order_by(desc(Content.updated))
1091
    #
1092
    #     if content_type != CONTENT_TYPES.Any_SLUG:
1093
    #         not_read_contents = not_read_contents.filter(
1094
    #             Content.type==content_type)
1095
    #     else:
1096
    #         not_read_contents = not_read_contents.filter(
1097
    #             Content.type!=CONTENT_TYPES.Folder.slug)
1098
    #
1099
    #     if parent_id:
1100
    #         not_read_contents = not_read_contents.filter(
1101
    #             Content.parent_id==parent_id)
1102
    #
1103
    #     result = []
1104
    #     for item in not_read_contents:
1105
    #         new_item = None
1106
    #         if CONTENT_TYPES.Comment.slug == item.type:
1107
    #             new_item = item.parent
1108
    #         else:
1109
    #             new_item = item
1110
    #
1111
    #         # INFO - D.A. - 2015-05-20
1112
    #         # We do not want to show only one item if the last 10 items are
1113
    #         # comments about one thread for example
1114
    #         if new_item not in result:
1115
    #             result.append(new_item)
1116
    #
1117
    #         if len(result) >= limit:
1118
    #             break
1119
    #
1120
    #     return result
1121
1122
    def _set_allowed_content(self, content: Content, allowed_content_dict: dict) -> None:  # nopep8
1123
        """
1124
        :param content: the given content instance
1125
        :param allowed_content_dict: must be something like this:
1126
            dict(
1127
                folder = True
1128
                thread = True,
1129
                file = False,
1130
                page = True
1131
            )
1132
        :return: nothing
1133
        """
1134
        properties = content.properties.copy()
1135
        properties['allowed_content'] = allowed_content_dict
1136
        content.properties = properties
1137
1138
    def set_allowed_content(self, content: Content, allowed_content_type_slug_list: typing.List[str]) -> None:  # nopep8
1139
        """
1140
        :param content: the given content instance
1141
        :param allowed_content_type_slug_list: list of content_type_slug to
1142
        accept as subcontent.
1143
        :return: nothing
1144
        """
1145
        allowed_content_dict = {}
1146
        for allowed_content_type_slug in allowed_content_type_slug_list:
1147
            if allowed_content_type_slug not in CONTENT_TYPES.endpoint_allowed_types_slug():
1148
                raise ContentTypeNotExist('Content_type {} does not exist'.format(allowed_content_type_slug))  # nopep8
1149
            allowed_content_dict[allowed_content_type_slug] = True
1150
1151
        self._set_allowed_content(content, allowed_content_dict)
1152
1153
    def restore_content_default_allowed_content(self, content: Content) -> None:
1154
        """
1155
        Return to default allowed_content_types
1156
        :param content: the given content instance
1157
        :return: nothing
1158
        """
1159
        if content._properties and 'allowed_content' in content._properties:
1160
            properties = content.properties.copy()
1161
            del properties['allowed_content']
1162
            content.properties = properties
1163
1164
    def set_status(self, content: Content, new_status: str):
1165
        if new_status in CONTENT_STATUS.get_all_slugs_values():
1166
            content.status = new_status
1167
            content.revision_type = ActionDescription.STATUS_UPDATE
1168
        else:
1169
            raise ValueError('The given value {} is not allowed'.format(new_status))
1170
1171
    def move(self,
1172
             item: Content,
1173
             new_parent: Content,
1174
             must_stay_in_same_workspace: bool=True,
1175
             new_workspace: Workspace=None,
1176
    ):
1177
        if must_stay_in_same_workspace:
1178
            if new_parent and new_parent.workspace_id != item.workspace_id:
1179
                raise ValueError('the item should stay in the same workspace')
1180
1181
        item.parent = new_parent
1182
        if new_workspace:
1183
            item.workspace = new_workspace
1184
            if new_parent and \
1185
                    new_parent.workspace_id != new_workspace.workspace_id:
1186
                raise WorkspacesDoNotMatch(
1187
                    'new parent workspace and new workspace should be the same.'
1188
                )
1189
        else:
1190
            if new_parent:
1191
                item.workspace = new_parent.workspace
1192
1193
        item.revision_type = ActionDescription.MOVE
1194
1195
    def copy(
1196
        self,
1197
        item: Content,
1198
        new_parent: Content=None,
1199
        new_label: str=None,
1200
        do_save: bool=True,
1201
        do_notify: bool=True,
1202
    ) -> Content:
1203
        """
1204
        Copy nearly all content, revision included. Children not included, see
1205
        "copy_children" for this.
1206
        :param item: Item to copy
1207
        :param new_parent: new parent of the new copied item
1208
        :param new_label: new label of the new copied item
1209
        :param do_notify: notify copy or not
1210
        :return: Newly copied item
1211
        """
1212
        if (not new_parent and not new_label) or (new_parent == item.parent and new_label == item.label):  # nopep8
1213
            # TODO - G.M - 08-03-2018 - Use something else than value error
1214
            raise ValueError("You can't copy file into itself")
1215
        if new_parent:
1216
            workspace = new_parent.workspace
1217
            parent = new_parent
1218
        else:
1219
            workspace = item.workspace
1220
            parent = item.parent
1221
        label = new_label or item.label
1222
1223
        content = item.copy(parent)
1224
        # INFO - GM - 15-03-2018 - add "copy" revision
1225
        with new_revision(
1226
            session=self._session,
1227
            tm=transaction.manager,
1228
            content=content,
1229
            force_create_new_revision=True
1230
        ) as rev:
1231
            rev.parent = parent
1232
            rev.workspace = workspace
1233
            rev.label = label
1234
            rev.revision_type = ActionDescription.COPY
1235
            rev.properties['origin'] = {
1236
                'content': item.id,
1237
                'revision': item.last_revision.revision_id,
1238
            }
1239
        if do_save:
1240
            self.save(content, ActionDescription.COPY, do_notify=do_notify)
1241
        return content
1242
1243
    def copy_children(self, origin_content: Content, new_content: Content):
1244
        for child in origin_content.children:
1245
            self.copy(child, new_content)
1246
1247
    def move_recursively(self, item: Content,
1248
                         new_parent: Content, new_workspace: Workspace):
1249
        self.move(item, new_parent, False, new_workspace)
1250
        self.save(item, do_notify=False)
1251
1252
        for child in item.children:
1253
            with new_revision(
1254
                session=self._session,
1255
                tm=transaction.manager,
1256
                content=child
1257
            ):
1258
                self.move_recursively(child, item, new_workspace)
1259
        return
1260
1261
    def update_content(self, item: Content, new_label: str, new_content: str=None) -> Content:
1262
        if item.label==new_label and item.description==new_content:
1263
            # TODO - G.M - 20-03-2018 - Fix internatization for webdav access.
1264
            # Internatization disabled in libcontent for now.
1265
            raise SameValueError('The content did not changed')
1266
        if not new_label:
1267
            raise EmptyLabelNotAllowed()
1268
        item.owner = self._user
1269
        item.label = new_label
1270
        item.description = new_content if new_content else item.description # TODO: convert urls into links
1271
        item.revision_type = ActionDescription.EDITION
1272
        return item
1273
1274
    def update_file_data(self, item: Content, new_filename: str, new_mimetype: str, new_content: bytes) -> Content:
1275
        if new_mimetype == item.file_mimetype and \
1276
                new_content == item.depot_file.file.read():
1277
            raise SameValueError('The content did not changed')
1278
        item.owner = self._user
1279
        item.file_name = new_filename
1280
        item.file_mimetype = new_mimetype
1281
        item.depot_file = FileIntent(
1282
            new_content,
1283
            new_filename,
1284
            new_mimetype,
1285
        )
1286
        item.revision_type = ActionDescription.REVISION
1287
        return item
1288
1289
    def archive(self, content: Content):
1290
        content.owner = self._user
1291
        content.is_archived = True
1292
        # TODO - G.M - 12-03-2018 - Inspect possible label conflict problem
1293
        # INFO - G.M - 12-03-2018 - Set label name to avoid trouble when
1294
        # un-archiving file.
1295
        content.label = '{label}-{action}-{date}'.format(
1296
            label=content.label,
1297
            action='archived',
1298
            date=current_date_for_filename()
1299
        )
1300
        content.revision_type = ActionDescription.ARCHIVING
1301
1302
    def unarchive(self, content: Content):
1303
        content.owner = self._user
1304
        content.is_archived = False
1305
        content.revision_type = ActionDescription.UNARCHIVING
1306
1307
    def delete(self, content: Content):
1308
        content.owner = self._user
1309
        content.is_deleted = True
1310
        # TODO - G.M - 12-03-2018 - Inspect possible label conflict problem
1311
        # INFO - G.M - 12-03-2018 - Set label name to avoid trouble when
1312
        # un-deleting file.
1313
        content.label = '{label}-{action}-{date}'.format(
1314
            label=content.label,
1315
            action='deleted',
1316
            date=current_date_for_filename()
1317
        )
1318
        content.revision_type = ActionDescription.DELETION
1319
1320
    def undelete(self, content: Content):
1321
        content.owner = self._user
1322
        content.is_deleted = False
1323
        content.revision_type = ActionDescription.UNDELETION
1324
1325
    def get_preview_page_nb(self, revision_id: int) -> int:
1326
        file_path = self.get_one_revision_filepath(revision_id)
1327
        nb_pages = self.preview_manager.get_page_nb(file_path)
1328
        return nb_pages
1329
1330
    def mark_read__all(
1331
            self,
1332
            read_datetime: datetime=None,
1333
            do_flush: bool=True,
1334
            recursive: bool=True
1335
    ) -> None:
1336
        """
1337
        Read content of all workspace visible for the user.
1338
        :param read_datetime: date of readigin
1339
        :param do_flush: flush database
1340
        :param recursive: mark read subcontent too
1341
        :return: nothing
1342
        """
1343
        return self.mark_read__workspace(None, read_datetime, do_flush, recursive) # nopep8
1344
1345
    def mark_read__workspace(self,
1346
                       workspace : Workspace,
1347
                       read_datetime: datetime=None,
1348
                       do_flush: bool=True,
1349
                       recursive: bool=True
1350
    ) -> None:
1351
        """
1352
        Read content of a workspace visible for the user.
1353
        :param read_datetime: date of readigin
1354
        :param do_flush: flush database
1355
        :param recursive: mark read subcontent too
1356
        :return: nothing
1357
        """
1358
        itemset = self.get_last_active(workspace)
1359
        for item in itemset:
1360
            if item.has_new_information_for(self._user):
1361
                self.mark_read(item, read_datetime, do_flush, recursive)
1362
1363
    def mark_read(
1364
            self,
1365
            content: Content,
1366
            read_datetime: datetime=None,
1367
            do_flush: bool=True,
1368
            recursive: bool=True
1369
    ) -> Content:
1370
        """
1371
        Read content for the user.
1372
        :param read_datetime: date of readigin
1373
        :param do_flush: flush database
1374
        :param recursive: mark read subcontent too
1375
        :return: nothing
1376
        """
1377
        assert self._user
1378
        assert content
1379
1380
        # The algorithm is:
1381
        # 1. define the read datetime
1382
        # 2. update all revisions related to current Content
1383
        # 3. do the same for all child revisions
1384
        #    (ie parent_id is content_id of current content)
1385
1386
        if not read_datetime:
1387
            read_datetime = datetime.datetime.now()
1388
1389
        viewed_revisions = self._session.query(ContentRevisionRO) \
1390
            .filter(ContentRevisionRO.content_id==content.content_id).all()
1391
1392
        for revision in viewed_revisions:
1393
            revision.read_by[self._user] = read_datetime
1394
1395
        if recursive:
1396
            # mark read :
1397
            # - all children
1398
            # - parent stuff (if you mark a comment as read,
1399
            #                 then you have seen the parent)
1400
            # - parent comments
1401
            for child in content.get_valid_children():
1402
                self.mark_read(child, read_datetime=read_datetime,
1403
                               do_flush=False)
1404
1405
            if CONTENT_TYPES.Comment.slug == content.type:
1406
                self.mark_read(content.parent, read_datetime=read_datetime,
1407
                               do_flush=False, recursive=False)
1408
                for comment in content.parent.get_comments():
1409
                    if comment != content:
1410
                        self.mark_read(comment, read_datetime=read_datetime,
1411
                                       do_flush=False, recursive=False)
1412
1413
        if do_flush:
1414
            self.flush()
1415
1416
        return content
1417
1418
    def mark_unread(self, content: Content, do_flush=True) -> Content:
1419
        assert self._user
1420
        assert content
1421
1422
        revisions = self._session.query(ContentRevisionRO) \
1423
            .filter(ContentRevisionRO.content_id==content.content_id).all()
1424
1425
        for revision in revisions:
1426
            try:
1427
                del revision.read_by[self._user]
1428
            except KeyError:
1429
                pass
1430
1431
        for child in content.get_valid_children():
1432
            self.mark_unread(child, do_flush=False)
1433
1434
        if do_flush:
1435
            self.flush()
1436
1437
        return content
1438
1439
    def flush(self):
1440
        self._session.flush()
1441
1442
    def save(self, content: Content, action_description: str=None, do_flush=True, do_notify=True):
1443
        """
1444
        Save an object, flush the session and set the revision_type property
1445
        :param content:
1446
        :param action_description:
1447
        :return:
1448
        """
1449
        assert action_description is None or action_description in ActionDescription.allowed_values()
1450
1451
        if not action_description:
1452
            # See if the last action has been modified
1453
            if content.revision_type==None or len(get_history(content.revision, 'revision_type'))<=0:
1454
                # The action has not been modified, so we set it to default edition
1455
                action_description = ActionDescription.EDITION
1456
1457
        if action_description:
1458
            content.revision_type = action_description
1459
1460
        if do_flush:
1461
            # INFO - 2015-09-03 - D.A.
1462
            # There are 2 flush because of the use
1463
            # of triggers for content creation
1464
            #
1465
            # (when creating a content, actually this is an insert of a new
1466
            # revision in content_revisions ; so the mark_read operation need
1467
            # to get full real data from database before to be prepared.
1468
1469
            self._session.add(content)
1470
            self._session.flush()
1471
1472
            # TODO - 2015-09-03 - D.A. - Do not use triggers
1473
            # We should create a new ContentRevisionRO object instead of Content
1474
            # This would help managing view/not viewed status
1475
            self.mark_read(content, do_flush=True)
1476
1477
        if do_notify:
1478
            self.do_notify(content)
1479
1480
    def do_notify(self, content: Content):
1481
        """
1482
        Allow to force notification for a given content. By default, it is
1483
        called during the .save() operation
1484
        :param content:
1485
        :return:
1486
        """
1487
        NotifierFactory.create(
1488
            config=self._config,
1489
            current_user=self._user,
1490
            session=self._session,
1491
        ).notify_content_update(content)
1492
1493
    def get_keywords(self, search_string, search_string_separators=None) -> [str]:
1494
        """
1495
        :param search_string: a list of coma-separated keywords
1496
        :return: a list of str (each keyword = 1 entry
1497
        """
1498
1499
        search_string_separators = search_string_separators or ContentApi.SEARCH_SEPARATORS
1500
1501
        keywords = []
1502
        if search_string:
1503
            keywords = [keyword.strip() for keyword in re.split(search_string_separators, search_string)]
1504
1505
        return keywords
1506
1507
    def search(self, keywords: [str]) -> Query:
1508
        """
1509
        :return: a sorted list of Content items
1510
        """
1511
1512
        if len(keywords)<=0:
1513
            return None
1514
1515
        filter_group_label = list(Content.label.ilike('%{}%'.format(keyword)) for keyword in keywords)
1516
        filter_group_desc = list(Content.description.ilike('%{}%'.format(keyword)) for keyword in keywords)
1517
        title_keyworded_items = self._hard_filtered_base_query().\
1518
            filter(or_(*(filter_group_label+filter_group_desc))).\
1519
            options(joinedload('children_revisions')).\
1520
            options(joinedload('parent'))
1521
1522
        return title_keyworded_items
1523
1524
    def get_all_types(self) -> typing.List[ContentType]:
1525
        labels = CONTENT_TYPES.endpoint_allowed_types_slug()
1526
        content_types = []
1527
        for label in labels:
1528
            content_types.append(CONTENT_TYPES.get_one_by_slug(label))
1529
1530
        return content_types
1531
1532
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1533
    def exclude_unavailable(
1534
        self,
1535
        contents: typing.List[Content],
1536
    ) -> typing.List[Content]:
1537
        """
1538
        Update and return list with content under archived/deleted removed.
1539
        :param contents: List of contents to parse
1540
        """
1541
        for content in contents[:]:
1542
            if self.content_under_deleted(content) or self.content_under_archived(content):
1543
                contents.remove(content)
1544
        return contents
1545
1546
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1547
    def content_under_deleted(self, content: Content) -> bool:
1548
        if content.parent:
1549
            if content.parent.is_deleted:
1550
                return True
1551
            if content.parent.parent:
1552
                return self.content_under_deleted(content.parent)
1553
        return False
1554
1555
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1556
    def content_under_archived(self, content: Content) -> bool:
1557
        if content.parent:
1558
            if content.parent.is_archived:
1559
                return True
1560
            if content.parent.parent:
1561
                return self.content_under_archived(content.parent)
1562
        return False
1563
1564
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1565
    def find_one_by_unique_property(
1566
            self,
1567
            property_name: str,
1568
            property_value: str,
1569
            workspace: Workspace=None,
1570
    ) -> Content:
1571
        """
1572
        Return Content who contains given property.
1573
        Raise sqlalchemy.orm.exc.MultipleResultsFound if more than one Content
1574
        contains this property value.
1575
        :param property_name: Name of property
1576
        :param property_value: Value of property
1577
        :param workspace: Workspace who contains Content
1578
        :return: Found Content
1579
        """
1580
        # TODO - 20160602 - Bastien: Should be JSON type query
1581
        # see https://www.compose.io/articles/using-json-extensions-in-\
1582
        # postgresql-from-python-2/
1583
        query = self._base_query(workspace=workspace).filter(
1584
            Content._properties.like(
1585
                '%"{property_name}": "{property_value}"%'.format(
1586
                    property_name=property_name,
1587
                    property_value=property_value,
1588
                )
1589
            )
1590
        )
1591
        return query.one()
1592
1593
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1594
    def generate_folder_label(
1595
            self,
1596
            workspace: Workspace,
1597
            parent: Content=None,
1598
    ) -> str:
1599
        """
1600
        Generate a folder label
1601
        :param workspace: Future folder workspace
1602
        :param parent: Parent of foture folder (can be None)
1603
        :return: Generated folder name
1604
        """
1605
        _ = self.translator.get_translation
1606
        query = self._base_query(workspace=workspace)\
1607
            .filter(Content.label.ilike('{0}%'.format(
1608
                _('New folder'),
1609
            )))
1610
        if parent:
1611
            query = query.filter(Content.parent == parent)
1612
1613
        return _('New folder {0}').format(
1614
            query.count() + 1,
1615
        )
1616