Passed
Pull Request — develop (#31)
by Bastien
01:43
created

Api.create()   F

Complexity

Conditions 20

Size

Total Lines 74
Code Lines 50

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 50
dl 0
loc 74
rs 0
c 0
b 0
f 0
cc 20
nop 9

How to fix   Long Method    Complexity    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like tracim_backend.lib.core.content.ContentApi.create() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
# -*- coding: utf-8 -*-
2
import datetime
3
import os
4
import re
5
import typing
6
from contextlib import contextmanager
7
8
import sqlalchemy
9
import transaction
10
from depot.io.utils import FileIntent
11
from depot.manager import DepotManager
12
from preview_generator.exception import UnsupportedMimeType
13
from preview_generator.manager import PreviewManager
14
from sqlalchemy import desc
15
from sqlalchemy import func
16
from sqlalchemy import or_
17
from sqlalchemy.orm import Query
18
from sqlalchemy.orm import aliased
19
from sqlalchemy.orm import joinedload
20
from sqlalchemy.orm.attributes import get_history
21
from sqlalchemy.orm.exc import NoResultFound
22
from sqlalchemy.orm.session import Session
23
from sqlalchemy.sql.elements import and_
24
25
from tracim_backend.app_models.contents import CONTENT_STATUS
26
from tracim_backend.app_models.contents import FOLDER_TYPE
27
from tracim_backend.app_models.contents import CONTENT_TYPES
28
from tracim_backend.app_models.contents import ContentType
29
from tracim_backend.exceptions import ContentLabelAlreadyUsedHere
30
from tracim_backend.exceptions import ContentNotFound
31
from tracim_backend.exceptions import UnavailablePreview
32
from tracim_backend.exceptions import ContentTypeNotExist
33
from tracim_backend.exceptions import EmptyCommentContentNotAllowed
34
from tracim_backend.exceptions import EmptyLabelNotAllowed
35
from tracim_backend.exceptions import PageOfPreviewNotFound
36
from tracim_backend.exceptions import PreviewDimNotAllowed
37
from tracim_backend.exceptions import RevisionDoesNotMatchThisContent
38
from tracim_backend.exceptions import SameValueError
39
from tracim_backend.exceptions import UnallowedSubContent
40
from tracim_backend.exceptions import WorkspacesDoNotMatch
41
from tracim_backend.lib.core.notifications import NotifierFactory
42
from tracim_backend.lib.utils.logger import logger
43
from tracim_backend.lib.utils.translation import DEFAULT_FALLBACK_LANG
44
from tracim_backend.lib.utils.translation import Translator
45
from tracim_backend.lib.utils.utils import cmp_to_key
46
from tracim_backend.lib.utils.utils import current_date_for_filename
47
from tracim_backend.lib.utils.utils import preview_manager_page_format
48
from tracim_backend.models.auth import User
49
from tracim_backend.models.context_models import ContentInContext
50
from tracim_backend.models.context_models import PreviewAllowedDim
51
from tracim_backend.models.context_models import RevisionInContext
52
from tracim_backend.models.data import ActionDescription
53
from tracim_backend.models.data import Content
54
from tracim_backend.models.data import ContentRevisionRO
55
from tracim_backend.models.data import NodeTreeItem
56
from tracim_backend.models.data import RevisionReadStatus
57
from tracim_backend.models.data import UserRoleInWorkspace
58
from tracim_backend.models.data import Workspace
59
from tracim_backend.models.revision_protection import new_revision
60
61
__author__ = 'damien'
62
63
64
# TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
65
def compare_content_for_sorting_by_type_and_name(
66
        content1: Content,
67
        content2: Content
68
) -> int:
69
    """
70
    :param content1:
71
    :param content2:
72
    :return:    1 if content1 > content2
73
                -1 if content1 < content2
74
                0 if content1 = content2
75
    """
76
77
    if content1.type == content2.type:
78
        if content1.get_label().lower()>content2.get_label().lower():
79
            return 1
80
        elif content1.get_label().lower()<content2.get_label().lower():
81
            return -1
82
        return 0
83
    else:
84
        # TODO - D.A. - 2014-12-02 - Manage Content Types Dynamically
85
        content_type_order = [
86
            CONTENT_TYPES.Folder.slug,
87
            CONTENT_TYPES.Page.slug,
88
            CONTENT_TYPES.Thread.slug,
89
            CONTENT_TYPES.File.slug,
90
        ]
91
92
        content_1_type_index = content_type_order.index(content1.type)
93
        content_2_type_index = content_type_order.index(content2.type)
94
        result = content_1_type_index - content_2_type_index
95
96
        if result < 0:
97
            return -1
98
        elif result > 0:
99
            return 1
100
        else:
101
            return 0
102
103
# TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
104
def compare_tree_items_for_sorting_by_type_and_name(
105
        item1: NodeTreeItem,
106
        item2: NodeTreeItem
107
) -> int:
108
    return compare_content_for_sorting_by_type_and_name(item1.node, item2.node)
109
110
111
class ContentApi(object):
112
113
    SEARCH_SEPARATORS = ',| '
114
    SEARCH_DEFAULT_RESULT_NB = 50
115
116
    # DISPLAYABLE_CONTENTS = (
117
    #     CONTENT_TYPES.Folder.slug,
118
    #     CONTENT_TYPES.File.slug,
119
    #     CONTENT_TYPES.Comment.slug,
120
    #     CONTENT_TYPES.Thread.slug,
121
    #     CONTENT_TYPES.Page.slug,
122
    #     CONTENT_TYPES.Page.slugLegacy,
123
    #     ContentType.MarkdownPage,
124
    # )
125
126
    def __init__(
127
            self,
128
            session: Session,
129
            current_user: typing.Optional[User],
130
            config,
131
            show_archived: bool = False,
132
            show_deleted: bool = False,
133
            show_temporary: bool = False,
134
            show_active: bool = True,
135
            all_content_in_treeview: bool = True,
136
            force_show_all_types: bool = False,
137
            disable_user_workspaces_filter: bool = False,
138
    ) -> None:
139
        self._session = session
140
        self._user = current_user
141
        self._config = config
142
        self._user_id = current_user.user_id if current_user else None
143
        self._show_archived = show_archived
144
        self._show_deleted = show_deleted
145
        self._show_temporary = show_temporary
146
        self._show_active = show_active
147
        self._show_all_type_of_contents_in_treeview = all_content_in_treeview
148
        self._force_show_all_types = force_show_all_types
149
        self._disable_user_workspaces_filter = disable_user_workspaces_filter
150
        self.preview_manager = PreviewManager(self._config.PREVIEW_CACHE_DIR, create_folder=True)  # nopep8
151
        default_lang = None
152
        if self._user:
153
            default_lang = self._user.lang
154
        if not default_lang:
155
            default_lang = DEFAULT_FALLBACK_LANG
156
        self.translator = Translator(app_config=self._config, default_lang=default_lang)  # nopep8
157
158
    @contextmanager
159
    def show(
160
            self,
161
            show_archived: bool=False,
162
            show_deleted: bool=False,
163
            show_temporary: bool=False,
164
    ) -> typing.Generator['ContentApi', None, None]:
165
        """
166
        Use this method as context manager to update show_archived,
167
        show_deleted and show_temporary properties during context.
168
        :param show_archived: show archived contents
169
        :param show_deleted:  show deleted contents
170
        :param show_temporary:  show temporary contents
171
        """
172
        previous_show_archived = self._show_archived
173
        previous_show_deleted = self._show_deleted
174
        previous_show_temporary = self._show_temporary
175
176
        try:
177
            self._show_archived = show_archived
178
            self._show_deleted = show_deleted
179
            self._show_temporary = show_temporary
180
            yield self
181
        finally:
182
            self._show_archived = previous_show_archived
183
            self._show_deleted = previous_show_deleted
184
            self._show_temporary = previous_show_temporary
185
186
    def get_content_in_context(self, content: Content) -> ContentInContext:
187
        return ContentInContext(content, self._session, self._config, self._user)  # nopep8
188
189
    def get_revision_in_context(self, revision: ContentRevisionRO) -> RevisionInContext:  # nopep8
190
        # TODO - G.M - 2018-06-173 - create revision in context object
191
        return RevisionInContext(revision, self._session, self._config, self._user) # nopep8
192
    
193
    def _get_revision_join(self) -> sqlalchemy.sql.elements.BooleanClauseList:
194
        """
195
        Return the Content/ContentRevision query join condition
196
        :return: Content/ContentRevision query join condition
197
        """
198
        return and_(Content.id == ContentRevisionRO.content_id,
199
                    ContentRevisionRO.revision_id == self._session.query(
200
                        ContentRevisionRO.revision_id)
201
                    .filter(ContentRevisionRO.content_id == Content.id)
202
                    .order_by(ContentRevisionRO.revision_id.desc())
203
                    .limit(1)
204
                    .correlate(Content))
205
206
    def get_canonical_query(self) -> Query:
207
        """
208
        Return the Content/ContentRevision base query who join these table on the last revision.
209
        :return: Content/ContentRevision Query
210
        """
211
        return self._session.query(Content)\
212
            .join(ContentRevisionRO, self._get_revision_join())
213
214
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
215
    @classmethod
216
    def sort_tree_items(
217
        cls,
218
        content_list: typing.List[NodeTreeItem],
219
    )-> typing.List[NodeTreeItem]:
220
        news = []
221
        for item in content_list:
222
            news.append(item)
223
224
        content_list.sort(key=cmp_to_key(
225
            compare_tree_items_for_sorting_by_type_and_name,
226
        ))
227
228
        return content_list
229
230
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
231
    @classmethod
232
    def sort_content(
233
        cls,
234
        content_list: typing.List[Content],
235
    ) -> typing.List[Content]:
236
        content_list.sort(key=cmp_to_key(compare_content_for_sorting_by_type_and_name))
237
        return content_list
238
239
    def __real_base_query(
240
        self,
241
        workspace: Workspace = None,
242
    ) -> Query:
243
        result = self.get_canonical_query()
244
245
        # Exclude non displayable types
246
        if not self._force_show_all_types:
247
            result = result.filter(Content.type.in_(CONTENT_TYPES.query_allowed_types_slugs()))
248
249
        if workspace:
250
            result = result.filter(Content.workspace_id == workspace.workspace_id)
251
252
        # Security layer: if user provided, filter
253
        # with user workspaces privileges
254
        if self._user and not self._disable_user_workspaces_filter:
255
            user = self._session.query(User).get(self._user_id)
256
            # Filter according to user workspaces
257
            workspace_ids = [r.workspace_id for r in user.roles \
258
                             if r.role>=UserRoleInWorkspace.READER]
259
            result = result.filter(or_(
260
                Content.workspace_id.in_(workspace_ids),
261
                # And allow access to non workspace document when he is owner
262
                and_(
263
                    Content.workspace_id == None,
264
                    Content.owner_id == self._user_id,
265
                )
266
            ))
267
268
        return result
269
270
    def _base_query(self, workspace: Workspace=None) -> Query:
271
        result = self.__real_base_query(workspace)
272
273
        if not self._show_active:
274
            result = result.filter(or_(
275
                Content.is_deleted==True,
276
                Content.is_archived==True,
277
            ))
278
        if not self._show_deleted:
279
            result = result.filter(Content.is_deleted==False)
280
281
        if not self._show_archived:
282
            result = result.filter(Content.is_archived==False)
283
284
        if not self._show_temporary:
285
            result = result.filter(Content.is_temporary==False)
286
287
        return result
288
289
    def __revisions_real_base_query(
290
        self,
291
        workspace: Workspace=None,
292
    ) -> Query:
293
        result = self._session.query(ContentRevisionRO)
294
295
        # Exclude non displayable types
296
        if not self._force_show_all_types:
297
            result = result.filter(Content.type.in_(self.DISPLAYABLE_CONTENTS))
298
299
        if workspace:
300
            result = result.filter(ContentRevisionRO.workspace_id==workspace.workspace_id)
301
302
        if self._user:
303
            user = self._session.query(User).get(self._user_id)
304
            # Filter according to user workspaces
305
            workspace_ids = [r.workspace_id for r in user.roles \
306
                             if r.role>=UserRoleInWorkspace.READER]
307
            result = result.filter(ContentRevisionRO.workspace_id.in_(workspace_ids))
308
309
        return result
310
311
    def _revisions_base_query(
312
        self,
313
        workspace: Workspace=None,
314
    ) -> Query:
315
        result = self.__revisions_real_base_query(workspace)
316
317
        if not self._show_deleted:
318
            result = result.filter(ContentRevisionRO.is_deleted==False)
319
320
        if not self._show_archived:
321
            result = result.filter(ContentRevisionRO.is_archived==False)
322
323
        if not self._show_temporary:
324
            result = result.filter(Content.is_temporary==False)
325
326
        return result
327
328
    def _hard_filtered_base_query(
329
        self,
330
        workspace: Workspace=None,
331
    ) -> Query:
332
        """
333
        If set to True, then filterign on is_deleted and is_archived will also
334
        filter parent properties. This is required for search() function which
335
        also search in comments (for example) which may be 'not deleted' while
336
        the associated content is deleted
337
338
        :param hard_filtering:
339
        :return:
340
        """
341
        result = self.__real_base_query(workspace)
342
343
        if not self._show_deleted:
344
            parent = aliased(Content)
345
            result = result.join(parent, Content.parent).\
346
                filter(Content.is_deleted==False).\
347
                filter(parent.is_deleted==False)
348
349
        if not self._show_archived:
350
            parent = aliased(Content)
351
            result = result.join(parent, Content.parent).\
352
                filter(Content.is_archived==False).\
353
                filter(parent.is_archived==False)
354
355
        if not self._show_temporary:
356
            parent = aliased(Content)
357
            result = result.join(parent, Content.parent). \
358
                filter(Content.is_temporary == False). \
359
                filter(parent.is_temporary == False)
360
361
        return result
362
363
    def get_base_query(
364
        self,
365
        workspace: Workspace,
366
    ) -> Query:
367
        return self._base_query(workspace)
368
369
    # TODO - G.M - 2018-07-17 - [Cleanup] Drop this method if unneeded
370
    # def get_child_folders(self, parent: Content=None, workspace: Workspace=None, filter_by_allowed_content_types: list=[], removed_item_ids: list=[], allowed_node_types=None) -> typing.List[Content]:
371
    #     """
372
    #     This method returns child items (folders or items) for left bar treeview.
373
    # 
374
    #     :param parent:
375
    #     :param workspace:
376
    #     :param filter_by_allowed_content_types:
377
    #     :param removed_item_ids:
378
    #     :param allowed_node_types: This parameter allow to hide folders for which the given type of content is not allowed.
379
    #            For example, if you want to move a Page from a folder to another, you should show only folders that accept pages
380
    #     :return:
381
    #     """
382
    #     filter_by_allowed_content_types = filter_by_allowed_content_types or []  # FDV
383
    #     removed_item_ids = removed_item_ids or []  # FDV
384
    # 
385
    #     if not allowed_node_types:
386
    #         allowed_node_types = [CONTENT_TYPES.Folder.slug]
387
    #     elif allowed_node_types==CONTENT_TYPES.Any_SLUG:
388
    #         allowed_node_types = ContentType.all()
389
    # 
390
    #     parent_id = parent.content_id if parent else None
391
    #     folders = self._base_query(workspace).\
392
    #         filter(Content.parent_id==parent_id).\
393
    #         filter(Content.type.in_(allowed_node_types)).\
394
    #         filter(Content.content_id.notin_(removed_item_ids)).\
395
    #         all()
396
    # 
397
    #     if not filter_by_allowed_content_types or \
398
    #                     len(filter_by_allowed_content_types)<=0:
399
    #         # Standard case for the left treeview: we want to show all contents
400
    #         # in the left treeview... so we still filter because for example
401
    #         # comments must not appear in the treeview
402
    #         return [folder for folder in folders \
403
    #                 if folder.type in ContentType.allowed_types_for_folding()]
404
    # 
405
    #     # Now this is a case of Folders only (used for moving content)
406
    #     # When moving a content, you must get only folders that allow to be filled
407
    #     # with the type of content you want to move
408
    #     result = []
409
    #     for folder in folders:
410
    #         for allowed_content_type in filter_by_allowed_content_types:
411
    # 
412
    #             is_folder = folder.type == CONTENT_TYPES.Folder.slug
413
    #             content_type__allowed = folder.properties['allowed_content'][allowed_content_type] == True
414
    # 
415
    #             if is_folder and content_type__allowed:
416
    #                 result.append(folder)
417
    #                 break
418
    # 
419
    #     return result
420
421
    def _is_content_label_free(
422
            self,
423
            label: str,
424
            workspace: Workspace,
425
            parent: Content = None,
426
            exclude_content_id: int = None,
427
    ) -> bool:
428
        """
429
        Check if content label is free
430
        :param label: content label
431
        :param workspace: workspace of the content
432
        :param parent: parent of the content
433
        :param exclude_content_id: exclude a specific content_id (useful
434
        to verify  other content for content update)
435
        :return: True if content label is available
436
        """
437
        # INFO - G.M - 2018-09-04 - Method should not be used by special content
438
        # with empty label like comment.
439
        assert label
440
        assert workspace
441
        query = self.get_base_query(workspace)
442
443
        if parent:
444
            query = query.filter(Content.parent_id == parent.content_id)
445
        else:
446
            query = query.filter(Content.parent_id == None)
447
448
        if exclude_content_id:
449
            query = query.filter(Content.content_id != exclude_content_id)
450
        query = query.filter(Content.workspace_id == workspace.workspace_id)
451
452
        nb_content_with_the_label = query.filter(Content.label == label).count()
453
        if nb_content_with_the_label == 0:
454
            return True
455
        elif nb_content_with_the_label == 1:
456
            return False
457
        else:
458
            critical_error_text = 'Something is wrong in the database ! '\
459
                                  'Content label should be unique ' \
460
                                  'in a same folder in database' \
461
                                  'but you have {nb} content with ' \
462
                                  'label {label} in workspace {workspace_id}'
463
464
            critical_error_text = critical_error_text.format(
465
                nb=nb_content_with_the_label,
466
                label=label,
467
                workspace_id=workspace.workspace_id,
468
            )
469
            if parent:
470
                critical_error_text = '{text} and parent as content {parent_id}'.format(  # nopep8
471
                    text=critical_error_text,
472
                    parent_id=parent.parent_id
473
                )
474
            logger.critical(self, critical_error_text)
475
            return False
476
477
    def _is_content_label_free_or_raise(
478
        self,
479
        label: str,
480
        workspace: Workspace,
481
        parent: Content = None,
482
        exclude_content_id: int = None,
483
    ) -> None:
484
        """
485
        Same as _is_content_label_free but raise exception instead of
486
        returning boolean if content label is already used
487
        """
488
        if not self._is_content_label_free(
489
            label,
490
            workspace,
491
            parent,
492
            exclude_content_id
493
        ):
494
            text = 'A Content already exist with the same label {label} ' \
495
                   ' in workspace {workspace_id}'.format(
496
                      label=label,
497
                      workspace_id=workspace.workspace_id,
498
                   )
499
            if parent:
500
                text = '{text} and parent as content {parent_id}'.format(
501
                    text=text,
502
                    parent_id=parent.parent_id
503
                )
504
            raise ContentLabelAlreadyUsedHere(text)
505
506
    def create(self, content_type_slug: str, workspace: Workspace, parent: Content=None, label: str = '', filename: str = '', do_save=False, is_temporary: bool=False, do_notify=True) -> Content:
507
        # TODO - G.M - 2018-07-16 - raise Exception instead of assert
508
        assert content_type_slug != CONTENT_TYPES.Any_SLUG
509
        assert not (label and filename)
510
511
        if content_type_slug == FOLDER_TYPE and not label:
512
            label = self.generate_folder_label(workspace, parent)
513
514
        # TODO BS 2018-08-13: Despite that workspace is required, create_comment
515
        # can call here with None. Must update create_comment tu require the
516
        # workspace.
517
        if not workspace:
518
            workspace = parent.workspace
519
520
        content_type = CONTENT_TYPES.get_one_by_slug(content_type_slug)
521
        if parent and parent.properties and 'allowed_content' in parent.properties:
522
            if content_type.slug not in parent.properties['allowed_content'] or not parent.properties['allowed_content'][content_type.slug]:
523
                raise UnallowedSubContent(' SubContent of type {subcontent_type}  not allowed in content {content_id}'.format(  # nopep8
524
                    subcontent_type=content_type.slug,
525
                    content_id=parent.content_id,
526
                ))
527
        if not workspace and parent:
528
            workspace = parent.workspace
529
530
        if workspace:
531
            if content_type.slug not in workspace.get_allowed_content_types():
532
                raise UnallowedSubContent(
533
                    ' SubContent of type {subcontent_type}  not allowed in workspace {content_id}'.format(  # nopep8
534
                        subcontent_type=content_type.slug,
535
                        content_id=workspace.workspace_id,
536
                    )
537
                )
538
        if filename:
539
            label = os.path.splitext(filename)[0]
540
        if label:
541
            self._is_content_label_free_or_raise(
542
                label,
543
                workspace,
544
                parent,
545
            )
546
547
        content = Content()
548
        if filename:
549
            # INFO - G.M - 2018-07-04 - File_name setting automatically
550
            # set label and file_extension
551
            content.file_name = label
552
        elif label:
553
            content.label = label
554
        else:
555
            if content_type_slug == CONTENT_TYPES.Comment.slug:
556
                # INFO - G.M - 2018-07-16 - Default label for comments is
557
                # empty string.
558
                content.label = ''
559
            else:
560
                raise EmptyLabelNotAllowed('Content of this type should have a valid label')  # nopep8
561
562
        content.owner = self._user
563
        content.parent = parent
564
565
        content.workspace = workspace
566
        content.type = content_type.slug
567
        content.is_temporary = is_temporary
568
        content.revision_type = ActionDescription.CREATION
569
570
        if content.type in (
571
                CONTENT_TYPES.Page.slug,
572
                CONTENT_TYPES.Thread.slug,
573
        ):
574
            content.file_extension = '.html'
575
576
        if do_save:
577
            self._session.add(content)
578
            self.save(content, ActionDescription.CREATION, do_notify=do_notify)
579
        return content
580
581
    def create_comment(self, workspace: Workspace=None, parent: Content=None, content:str ='', do_save=False) -> Content:
582
        # TODO: check parent allowed_type and workspace allowed_ type
583
        assert parent and parent.type != FOLDER_TYPE
584
        if not content:
585
            raise EmptyCommentContentNotAllowed()
586
587
        item = self.create(
588
            content_type_slug=CONTENT_TYPES.Comment.slug,
589
            workspace=workspace,
590
            parent=parent,
591
            do_notify=False,
592
            do_save=False,
593
            label='',
594
        )
595
        item.description = content
596
        item.revision_type = ActionDescription.COMMENT
597
598
        if do_save:
599
            self.save(item, ActionDescription.COMMENT)
600
        return item
601
602
    def get_one_from_revision(self, content_id: int, content_type: str, workspace: Workspace=None, revision_id=None) -> Content:
603
        """
604
        This method is a hack to convert a node revision item into a node
605
        :param content_id:
606
        :param content_type:
607
        :param workspace:
608
        :param revision_id:
609
        :return:
610
        """
611
612
        content = self.get_one(content_id, content_type, workspace)
613
        revision = self._session.query(ContentRevisionRO).filter(ContentRevisionRO.revision_id==revision_id).one()
614
615
        if revision.content_id==content.content_id:
616
            content.revision_to_serialize = revision.revision_id
617
        else:
618
            raise ValueError('Revision not found for given content')
619
620
        return content
621
622
    def get_one(self, content_id: int, content_type: str, workspace: Workspace=None, parent: Content=None) -> Content:
623
624
        if not content_id:
625
            return None
626
627
        base_request = self._base_query(workspace).filter(Content.content_id==content_id)
628
629
        if content_type!=CONTENT_TYPES.Any_SLUG:
630
            base_request = base_request.filter(Content.type==content_type)
631
632
        if parent:
633
            base_request = base_request.filter(Content.parent_id==parent.content_id)  # nopep8
634
635
        try:
636
            content = base_request.one()
637
        except NoResultFound as exc:
638
            # TODO - G.M - 2018-07-16 - Add better support for all different
639
            # error case who can happened here
640
            # like content doesn't exist, wrong parent, wrong content_type, wrong workspace,
641
            # wrong access to this workspace, wrong base filter according
642
            # to content_status.
643
            raise ContentNotFound('Content "{}" not found in database'.format(content_id)) from exc  # nopep8
644
        return content
645
646
    def get_one_revision(self, revision_id: int = None, content: Content= None) -> ContentRevisionRO:  # nopep8
647
        """
648
        This method allow us to get directly any revision with its id
649
        :param revision_id: The content's revision's id that we want to return
650
        :param content: The content related to the revision, if None do not
651
        check if revision is related to this content.
652
        :return: An item Content linked with the correct revision
653
        """
654
        assert revision_id is not None# DYN_REMOVE
655
656
        revision = self._session.query(ContentRevisionRO).filter(ContentRevisionRO.revision_id == revision_id).one()
657
        if content and revision.content_id != content.content_id:
658
            raise RevisionDoesNotMatchThisContent(
659
                'revision {revision_id} is not a revision of content {content_id}'.format(  # nopep8
660
                    revision_id=revision.revision_id,
661
                    content_id=content.content_id,
662
                    )
663
            )
664
        return revision
665
666
    # INFO - A.P - 2017-07-03 - python file object getter
667
    # in case of we cook a version of preview manager that allows a pythonic
668
    # access to files
669
    # def get_one_revision_file(self, revision_id: int = None):
670
    #     """
671
    #     This function allows us to directly get a Python file object from its
672
    #     revision identifier.
673
    #     :param revision_id: The revision id of the file we want to return
674
    #     :return: The corresponding Python file object
675
    #     """
676
    #     revision = self.get_one_revision(revision_id)
677
    #     return DepotManager.get().get(revision.depot_file)
678
679
    def get_one_revision_filepath(self, revision_id: int = None) -> str:
680
        """
681
        This method allows us to directly get a file path from its revision
682
        identifier.
683
        :param revision_id: The revision id of the filepath we want to return
684
        :return: The corresponding filepath
685
        """
686
        revision = self.get_one_revision(revision_id)
687
        depot = DepotManager.get()
688
        depot_stored_file = depot.get(revision.depot_file)  # type: StoredFile
689
        depot_file_path = depot_stored_file._file_path  # type: str
690
        return depot_file_path
691
692
    # TODO - G.M - 2018-09-04 - [Cleanup] Is this method already needed ?
693
    def get_one_by_label_and_parent(
694
            self,
695
            content_label: str,
696
            content_parent: Content=None,
697
    ) -> Content:
698
        """
699
        This method let us request the database to obtain a Content with its name and parent
700
        :param content_label: Either the content's label or the content's filename if the label is None
701
        :param content_parent: The parent's content
702
        :param workspace: The workspace's content
703
        :return The corresponding Content
704
        """
705
        workspace = content_parent.workspace if content_parent else None
706
        query = self._base_query(workspace)
707
        parent_id = content_parent.content_id if content_parent else None
708
        query = query.filter(Content.parent_id == parent_id)
709
710
        file_name, file_extension = os.path.splitext(content_label)
711
712
        # TODO - G.M - 2018-09-04 - If this method is needed, it should be
713
        # rewritten in order to avoid content_type hardcoded code there
714
        return query.filter(
715
            or_(
716
                and_(
717
                    Content.type == CONTENT_TYPES.File.slug,
718
                    Content.label == file_name,
719
                    Content.file_extension == file_extension,
720
                ),
721
                and_(
722
                    Content.type == CONTENT_TYPES.Thread.slug,
723
                    Content.label == file_name,
724
                ),
725
                and_(
726
                    Content.type == CONTENT_TYPES.Page.slug,
727
                    Content.label == file_name,
728
                ),
729
                and_(
730
                    Content.type == CONTENT_TYPES.Folder.slug,
731
                    Content.label == content_label,
732
                ),
733
            )
734
        ).one()
735
736
    # TODO - G.M - 2018-09-04 - [Cleanup] Is this method already needed ?
737
    def get_one_by_label_and_parent_labels(
738
            self,
739
            content_label: str,
740
            workspace: Workspace,
741
            content_parent_labels: [str]=None,
742
    ):
743
        """
744
        Return content with it's label, workspace and parents labels (optional)
745
        :param content_label: label of content (label or file_name)
746
        :param workspace: workspace containing all of this
747
        :param content_parent_labels: Ordered list of labels representing path
748
            of folder (without workspace label).
749
        E.g.: ['foo', 'bar'] for complete path /Workspace1/foo/bar folder
750
        :return: Found Content
751
        """
752
        query = self._base_query(workspace)
753
        parent_folder = None
754
755
        # Grab content parent folder if parent path given
756
        if content_parent_labels:
757
            parent_folder = self.get_folder_with_workspace_path_labels(
758
                content_parent_labels,
759
                workspace,
760
            )
761
762
        # Build query for found content by label
763
        content_query = self.filter_query_for_content_label_as_path(
764
            query=query,
765
            content_label_as_file=content_label,
766
        )
767
768
        # Modify query to apply parent folder filter if any
769
        if parent_folder:
770
            content_query = content_query.filter(
771
                Content.parent_id == parent_folder.content_id,
772
            )
773
        else:
774
            content_query = content_query.filter(
775
                Content.parent_id == None,
776
            )
777
778
        # Filter with workspace
779
        content_query = content_query.filter(
780
            Content.workspace_id == workspace.workspace_id,
781
        )
782
783
        # Return the content
784
        return content_query\
785
            .order_by(
786
                Content.revision_id.desc(),
787
            )\
788
            .one()
789
790
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
791
    def get_folder_with_workspace_path_labels(
792
            self,
793
            path_labels: [str],
794
            workspace: Workspace,
795
    ) -> Content:
796
        """
797
        Return a Content folder for given relative path.
798
        TODO BS 20161124: Not safe if web interface allow folder duplicate names
799
        :param path_labels: List of labels representing path of folder
800
        (without workspace label).
801
        E.g.: ['foo', 'bar'] for complete path /Workspace1/foo/bar folder
802
        :param workspace: workspace of folders
803
        :return: Content folder
804
        """
805
        query = self._base_query(workspace)
806
        folder = None
807
808
        for label in path_labels:
809
            # Filter query on label
810
            folder_query = query \
811
                .filter(
812
                    Content.type == CONTENT_TYPES.Folder.slug,
813
                    Content.label == label,
814
                    Content.workspace_id == workspace.workspace_id,
815
                )
816
817
            # Search into parent folder (if already deep)
818
            if folder:
819
                folder_query = folder_query\
820
                    .filter(
821
                        Content.parent_id == folder.content_id,
822
                    )
823
            else:
824
                folder_query = folder_query \
825
                    .filter(Content.parent_id == None)
826
827
            # Get thirst corresponding folder
828
            folder = folder_query \
829
                .order_by(Content.revision_id.desc()) \
830
                .one()
831
832
        return folder
833
834
    # TODO - G.M - 2018-09-04 - [Cleanup] Is this method already needed ?
835
    def filter_query_for_content_label_as_path(
836
            self,
837
            query: Query,
838
            content_label_as_file: str,
839
            is_case_sensitive: bool = False,
840
    ) -> Query:
841
        """
842
        Apply normalised filters to found Content corresponding as given label.
843
        :param query: query to modify
844
        :param content_label_as_file: label in this
845
        FILE version, use Content.get_label_as_file().
846
        :param is_case_sensitive: Take care about case or not
847
        :return: modified query
848
        """
849
        file_name, file_extension = os.path.splitext(content_label_as_file)
850
851
        label_filter = Content.label == content_label_as_file
852
        file_name_filter = Content.label == file_name
853
        file_extension_filter = Content.file_extension == file_extension
854
855
        if not is_case_sensitive:
856
            label_filter = func.lower(Content.label) == \
857
                           func.lower(content_label_as_file)
858
            file_name_filter = func.lower(Content.label) == \
859
                               func.lower(file_name)
860
            file_extension_filter = func.lower(Content.file_extension) == \
861
                                    func.lower(file_extension)
862
863
        # TODO - G.M - 2018-09-04 - If this method is needed, it should be
864
        # rewritten in order to avoid content_type hardcoded code there
865
        return query.filter(or_(
866
            and_(
867
                Content.type == CONTENT_TYPES.File.slug,
868
                file_name_filter,
869
                file_extension_filter,
870
            ),
871
            and_(
872
                Content.type == CONTENT_TYPES.Thread.slug,
873
                file_name_filter,
874
                file_extension_filter,
875
            ),
876
            and_(
877
                Content.type == CONTENT_TYPES.Page.slug,
878
                file_name_filter,
879
                file_extension_filter,
880
            ),
881
            and_(
882
                Content.type == CONTENT_TYPES.Folder.slug,
883
                label_filter,
884
            ),
885
        ))
886
887
    def get_pdf_preview_path(
888
            self,
889
            content_id: int,
890
            revision_id: int,
891
            page_number: int
892
    ) -> str:
893
        """
894
        Get pdf preview of revision of content
895
        :param content_id: id of content
896
        :param revision_id: id of content revision
897
        :param page_number: page number of the preview, useful for multipage content
898
        :return: preview_path as string
899
        """
900
        file_path = self.get_one_revision_filepath(revision_id)
901
        try:
902
            page_number = preview_manager_page_format(page_number)
903
            if page_number >= self.preview_manager.get_page_nb(file_path):
904
                raise PageOfPreviewNotFound(
905
                    'page_number {page_number} of content {content_id} does not exist'.format(
906
                        page_number=page_number,
907
                        content_id=content_id
908
                    ),
909
                )
910
            jpg_preview_path = self.preview_manager.get_pdf_preview(
911
                file_path,
912
                page=page_number
913
            )
914
        except UnsupportedMimeType as exc:
915
            raise UnavailablePreview(
916
                'No preview available for content {}, revision {}'.format(content_id, revision_id) # nopep8
917
            ) from exc
918
        return jpg_preview_path
919
920
    def get_full_pdf_preview_path(self, revision_id: int) -> str:
921
        """
922
        Get full(multiple page) pdf preview of revision of content
923
        :param revision_id: id of revision
924
        :return: path of the full pdf preview of this revision
925
        """
926
        file_path = self.get_one_revision_filepath(revision_id)
927
        try:
928
            pdf_preview_path = self.preview_manager.get_pdf_preview(file_path)
929
        except UnsupportedMimeType as exc:
930
            raise UnavailablePreview(
931
                'No preview available for revision {}'.format(revision_id)
932
            ) from exc
933
        return pdf_preview_path
934
935
    def get_jpg_preview_allowed_dim(self) -> PreviewAllowedDim:
936
        """
937
        Get jpg preview allowed dimensions and strict bool param.
938
        """
939
        return PreviewAllowedDim(
940
            self._config.PREVIEW_JPG_RESTRICTED_DIMS,
941
            self._config.PREVIEW_JPG_ALLOWED_DIMS,
942
        )
943
944
    def get_jpg_preview_path(
945
        self,
946
        content_id: int,
947
        revision_id: int,
948
        page_number: int,
949
        width: int = None,
950
        height: int = None,
951
    ) -> str:
952
        """
953
        Get jpg preview of revision of content
954
        :param content_id: id of content
955
        :param revision_id: id of content revision
956
        :param page_number: page number of the preview, useful for multipage content
957
        :param width: width in pixel
958
        :param height: height in pixel
959
        :return: preview_path as string
960
        """
961
        file_path = self.get_one_revision_filepath(revision_id)
962
        try:
963
            page_number = preview_manager_page_format(page_number)
964
            if page_number >= self.preview_manager.get_page_nb(file_path):
965
                raise PageOfPreviewNotFound(
966
                    'page {page_number} of revision {revision_id} of content {content_id} does not exist'.format(  # nopep8
967
                        page_number=page_number,
968
                        revision_id=revision_id,
969
                        content_id=content_id,
970
                    ),
971
                )
972
            if not width and not height:
973
                width = self._config.PREVIEW_JPG_ALLOWED_DIMS[0].width
974
                height = self._config.PREVIEW_JPG_ALLOWED_DIMS[0].height
975
976
            allowed_dim = False
977
            for preview_dim in self._config.PREVIEW_JPG_ALLOWED_DIMS:
978
                if width == preview_dim.width and height == preview_dim.height:
979
                    allowed_dim = True
980
                    break
981
982
            if not allowed_dim and self._config.PREVIEW_JPG_RESTRICTED_DIMS:
983
                raise PreviewDimNotAllowed(
984
                    'Size {width}x{height} is not allowed for jpeg preview'.format(
985
                        width=width,
986
                        height=height,
987
                    )
988
                )
989
            jpg_preview_path = self.preview_manager.get_jpeg_preview(
990
                file_path,
991
                page=page_number,
992
                width=width,
993
                height=height,
994
            )
995
        except UnsupportedMimeType as exc:
996
            raise UnavailablePreview(
997
                'No preview available for content {}, revision {}'.format(content_id, revision_id)  # nopep8
998
            ) from exc
999
        return jpg_preview_path
1000
1001
    def _get_all_query(
1002
        self,
1003
        parent_id: int = None,
1004
        content_type_slug: str = CONTENT_TYPES.Any_SLUG,
1005
        workspace: Workspace = None
1006
    ) -> Query:
1007
        """
1008
        Extended filter for better "get all data" query
1009
        :param parent_id: filter by parent_id
1010
        :param content_type_slug: filter by content_type slug
1011
        :param workspace: filter by workspace
1012
        :return:
1013
        """
1014
        assert parent_id is None or isinstance(parent_id, int)
1015
        assert content_type_slug is not None
1016
        resultset = self._base_query(workspace)
1017
1018
        if content_type_slug != CONTENT_TYPES.Any_SLUG:
1019
            # INFO - G.M - 2018-07-05 - convert with
1020
            #  content type object to support legacy slug
1021
            content_type_object = CONTENT_TYPES.get_one_by_slug(content_type_slug)
1022
            all_slug_alias = [content_type_object.slug]
1023
            if content_type_object.slug_alias:
1024
                all_slug_alias.extend(content_type_object.slug_alias)
1025
            resultset = resultset.filter(Content.type.in_(all_slug_alias))
1026
1027
        if parent_id:
1028
            resultset = resultset.filter(Content.parent_id==parent_id)
1029
        if parent_id == 0 or parent_id is False:
1030
            resultset = resultset.filter(Content.parent_id == None)
1031
1032
        return resultset
1033
1034
    def get_all(self, parent_id: int=None, content_type: str=CONTENT_TYPES.Any_SLUG, workspace: Workspace=None) -> typing.List[Content]:
1035
        return self._get_all_query(parent_id, content_type, workspace).all()
1036
1037
    # TODO - G.M - 2018-07-17 - [Cleanup] Drop this method if unneeded
1038
    # def get_children(self, parent_id: int, content_types: list, workspace: Workspace=None) -> typing.List[Content]:
1039
    #     """
1040
    #     Return parent_id childs of given content_types
1041
    #     :param parent_id: parent id
1042
    #     :param content_types: list of types
1043
    #     :param workspace: workspace filter
1044
    #     :return: list of content
1045
    #     """
1046
    #     resultset = self._base_query(workspace)
1047
    #     resultset = resultset.filter(Content.type.in_(content_types))
1048
    #
1049
    #     if parent_id:
1050
    #         resultset = resultset.filter(Content.parent_id==parent_id)
1051
    #     if parent_id is False:
1052
    #         resultset = resultset.filter(Content.parent_id == None)
1053
    #
1054
    #     return resultset.all()
1055
1056
    # TODO - G.M - 2018-07-17 - [Cleanup] Drop this method if unneeded
1057
    # TODO find an other name to filter on is_deleted / is_archived
1058
    def get_all_with_filter(self, parent_id: int=None, content_type: str=CONTENT_TYPES.Any_SLUG, workspace: Workspace=None) -> typing.List[Content]:
1059
        assert parent_id is None or isinstance(parent_id, int) # DYN_REMOVE
1060
        assert content_type is not None# DYN_REMOVE
1061
        assert isinstance(content_type, str) # DYN_REMOVE
1062
1063
        resultset = self._base_query(workspace)
1064
1065
        if content_type != CONTENT_TYPES.Any_SLUG:
1066
            resultset = resultset.filter(Content.type==content_type)
1067
1068
        resultset = resultset.filter(Content.is_deleted == self._show_deleted)
1069
        resultset = resultset.filter(Content.is_archived == self._show_archived)
1070
        resultset = resultset.filter(Content.is_temporary == self._show_temporary)
1071
1072
        resultset = resultset.filter(Content.parent_id==parent_id)
1073
1074
        return resultset.all()
1075
1076
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1077
    def get_all_without_exception(self, content_type: str, workspace: Workspace=None) -> typing.List[Content]:
1078
        assert content_type is not None# DYN_REMOVE
1079
1080
        resultset = self._base_query(workspace)
1081
1082
        if content_type != CONTENT_TYPES.Any_SLUG:
1083
            resultset = resultset.filter(Content.type==content_type)
1084
1085
        return resultset.all()
1086
1087
    def get_last_unread(self, parent_id: int, content_type: str,
1088
                        workspace: Workspace=None, limit=10) -> typing.List[Content]:
1089
        assert parent_id is None or isinstance(parent_id, int) # DYN_REMOVE
1090
        assert content_type is not None# DYN_REMOVE
1091
        assert isinstance(content_type, str) # DYN_REMOVE
1092
1093
        read_revision_ids = self._session.query(RevisionReadStatus.revision_id) \
1094
            .filter(RevisionReadStatus.user_id==self._user_id)
1095
1096
        not_read_revisions = self._revisions_base_query(workspace) \
1097
            .filter(~ContentRevisionRO.revision_id.in_(read_revision_ids)) \
1098
            .filter(ContentRevisionRO.workspace_id == Workspace.workspace_id) \
1099
            .filter(Workspace.is_deleted.is_(False)) \
1100
            .subquery()
1101
1102
    # TODO - G.M - 2018-07-17 - [Cleanup] Drop this method if unneeded
1103
    # def get_all_without_exception(self, content_type: str, workspace: Workspace=None) -> typing.List[Content]:
1104
    #     assert content_type is not None# DYN_REMOVE
1105
    #
1106
    #     resultset = self._base_query(workspace)
1107
    #
1108
    #     if content_type != CONTENT_TYPES.Any_SLUG:
1109
    #         resultset = resultset.filter(Content.type==content_type)
1110
    #
1111
    #     return resultset.all()
1112
1113
    def get_last_active(
1114
            self,
1115
            workspace: Workspace=None,
1116
            limit: typing.Optional[int]=None,
1117
            before_content: typing.Optional[Content]= None,
1118
            content_ids: typing.Optional[typing.List[int]] = None,
1119
    ) -> typing.List[Content]:
1120
        """
1121
        get contents list sorted by last update
1122
        (last modification of content itself or one of this comment)
1123
        :param workspace: Workspace to check
1124
        :param limit: maximum number of elements to return
1125
        :param before_content: last_active content are only those updated
1126
         before this content given.
1127
        :param content_ids: restrict selection to some content ids and
1128
        related Comments
1129
        :return: list of content
1130
        """
1131
1132
        resultset = self._get_all_query(
1133
            workspace=workspace,
1134
        )
1135
        if content_ids:
1136
            resultset = resultset.filter(
1137
                or_(
1138
                    Content.content_id.in_(content_ids),
1139
                    and_(
1140
                        Content.parent_id.in_(content_ids),
1141
                        Content.type == CONTENT_TYPES.Comment.slug
1142
                    )
1143
                )
1144
            )
1145
1146
        resultset = resultset.order_by(desc(Content.updated))
1147
1148
        active_contents = []
1149
        too_recent_content = []
1150
        before_content_find = False
1151
        for content in resultset:
1152
            related_active_content = None
1153
            if CONTENT_TYPES.Comment.slug == content.type:
1154
                related_active_content = content.parent
1155
            else:
1156
                related_active_content = content
1157
1158
            # INFO - G.M - 2018-08-10 - re-apply general filters here to avoid
1159
            # issue with comments
1160
            if not self._show_deleted and related_active_content.is_deleted:
1161
                continue
1162
            if not self._show_archived and related_active_content.is_archived:
1163
                continue
1164
1165
            if related_active_content not in active_contents and related_active_content not in too_recent_content:  # nopep8
1166
1167
                if not before_content or before_content_find:
1168
                    active_contents.append(related_active_content)
1169
                else:
1170
                    too_recent_content.append(related_active_content)
1171
1172
                if before_content and related_active_content == before_content:
1173
                    before_content_find = True
1174
1175
            if limit and len(active_contents) >= limit:
1176
                break
1177
1178
        return active_contents
1179
1180
    # TODO - G.M - 2018-07-19 - Find a way to update this method to something
1181
    # usable and efficient for tracim v2 to get content with read/unread status
1182
    # instead of relying on has_new_information_for()
1183
    # def get_last_unread(self, parent_id: typing.Optional[int], content_type: str,
1184
    #                     workspace: Workspace=None, limit=10) -> typing.List[Content]:
1185
    #     assert parent_id is None or isinstance(parent_id, int) # DYN_REMOVE
1186
    #     assert content_type is not None# DYN_REMOVE
1187
    #     assert isinstance(content_type, str) # DYN_REMOVE
1188
    #
1189
    #     read_revision_ids = self._session.query(RevisionReadStatus.revision_id) \
1190
    #         .filter(RevisionReadStatus.user_id==self._user_id)
1191
    #
1192
    #     not_read_revisions = self._revisions_base_query(workspace) \
1193
    #         .filter(~ContentRevisionRO.revision_id.in_(read_revision_ids)) \
1194
    #         .filter(ContentRevisionRO.workspace_id == Workspace.workspace_id) \
1195
    #         .filter(Workspace.is_deleted.is_(False)) \
1196
    #         .subquery()
1197
    #
1198
    #     not_read_content_ids_query = self._session.query(
1199
    #         distinct(not_read_revisions.c.content_id)
1200
    #     )
1201
    #     not_read_content_ids = list(map(
1202
    #         itemgetter(0),
1203
    #         not_read_content_ids_query,
1204
    #     ))
1205
    #
1206
    #     not_read_contents = self._base_query(workspace) \
1207
    #         .filter(Content.content_id.in_(not_read_content_ids)) \
1208
    #         .order_by(desc(Content.updated))
1209
    #
1210
    #     if content_type != CONTENT_TYPES.Any_SLUG:
1211
    #         not_read_contents = not_read_contents.filter(
1212
    #             Content.type==content_type)
1213
    #     else:
1214
    #         not_read_contents = not_read_contents.filter(
1215
    #             Content.type!=CONTENT_TYPES.Folder.slug)
1216
    #
1217
    #     if parent_id:
1218
    #         not_read_contents = not_read_contents.filter(
1219
    #             Content.parent_id==parent_id)
1220
    #
1221
    #     result = []
1222
    #     for item in not_read_contents:
1223
    #         new_item = None
1224
    #         if CONTENT_TYPES.Comment.slug == item.type:
1225
    #             new_item = item.parent
1226
    #         else:
1227
    #             new_item = item
1228
    #
1229
    #         # INFO - D.A. - 2015-05-20
1230
    #         # We do not want to show only one item if the last 10 items are
1231
    #         # comments about one thread for example
1232
    #         if new_item not in result:
1233
    #             result.append(new_item)
1234
    #
1235
    #         if len(result) >= limit:
1236
    #             break
1237
    #
1238
    #     return result
1239
1240
    def _set_allowed_content(self, content: Content, allowed_content_dict: dict) -> None:  # nopep8
1241
        """
1242
        :param content: the given content instance
1243
        :param allowed_content_dict: must be something like this:
1244
            dict(
1245
                folder = True
1246
                thread = True,
1247
                file = False,
1248
                page = True
1249
            )
1250
        :return: nothing
1251
        """
1252
        properties = content.properties.copy()
1253
        properties['allowed_content'] = allowed_content_dict
1254
        content.properties = properties
1255
1256
    def set_allowed_content(self, content: Content, allowed_content_type_slug_list: typing.List[str]) -> None:  # nopep8
1257
        """
1258
        :param content: the given content instance
1259
        :param allowed_content_type_slug_list: list of content_type_slug to
1260
        accept as subcontent.
1261
        :return: nothing
1262
        """
1263
        allowed_content_dict = {}
1264
        for allowed_content_type_slug in allowed_content_type_slug_list:
1265
            if allowed_content_type_slug not in CONTENT_TYPES.endpoint_allowed_types_slug():
1266
                raise ContentTypeNotExist('Content_type {} does not exist'.format(allowed_content_type_slug))  # nopep8
1267
            allowed_content_dict[allowed_content_type_slug] = True
1268
1269
        self._set_allowed_content(content, allowed_content_dict)
1270
1271
    def restore_content_default_allowed_content(self, content: Content) -> None:
1272
        """
1273
        Return to default allowed_content_types
1274
        :param content: the given content instance
1275
        :return: nothing
1276
        """
1277
        if content._properties and 'allowed_content' in content._properties:
1278
            properties = content.properties.copy()
1279
            del properties['allowed_content']
1280
            content.properties = properties
1281
1282
    def set_status(self, content: Content, new_status: str):
1283
        if new_status in CONTENT_STATUS.get_all_slugs_values():
1284
            content.status = new_status
1285
            content.revision_type = ActionDescription.STATUS_UPDATE
1286
        else:
1287
            raise ValueError('The given value {} is not allowed'.format(new_status))
1288
1289
    def move(self,
1290
             item: Content,
1291
             new_parent: Content,
1292
             must_stay_in_same_workspace: bool=True,
1293
             new_workspace: Workspace=None,
1294
    ):
1295
        if must_stay_in_same_workspace:
1296
            if new_parent and new_parent.workspace_id != item.workspace_id:
1297
                raise ValueError('the item should stay in the same workspace')
1298
1299
        item.parent = new_parent
1300
        if new_workspace:
1301
            item.workspace = new_workspace
1302
            if new_parent and \
1303
                    new_parent.workspace_id != new_workspace.workspace_id:
1304
                raise WorkspacesDoNotMatch(
1305
                    'new parent workspace and new workspace should be the same.'
1306
                )
1307
        else:
1308
            if new_parent:
1309
                item.workspace = new_parent.workspace
1310
1311
        self._is_content_label_free_or_raise(
1312
            item.label,
1313
            item.workspace,
1314
            item.parent,
1315
            exclude_content_id=item.content_id
1316
        )
1317
        item.revision_type = ActionDescription.MOVE
1318
1319
    def copy(
1320
        self,
1321
        item: Content,
1322
        new_parent: Content=None,
1323
        new_label: str=None,
1324
        do_save: bool=True,
1325
        do_notify: bool=True,
1326
    ) -> Content:
1327
        """
1328
        Copy nearly all content, revision included. Children not included, see
1329
        "copy_children" for this.
1330
        :param item: Item to copy
1331
        :param new_parent: new parent of the new copied item
1332
        :param new_label: new label of the new copied item
1333
        :param do_notify: notify copy or not
1334
        :return: Newly copied item
1335
        """
1336
        if (not new_parent and not new_label) or (new_parent == item.parent and new_label == item.label):  # nopep8
1337
            # TODO - G.M - 08-03-2018 - Use something else than value error
1338
            raise ValueError("You can't copy file into itself")
1339
        if new_parent:
1340
            workspace = new_parent.workspace
1341
            parent = new_parent
1342
        else:
1343
            workspace = item.workspace
1344
            parent = item.parent
1345
        label = new_label or item.label
1346
1347
        self._is_content_label_free_or_raise(label, workspace, parent)
1348
        content = item.copy(parent)
1349
        # INFO - GM - 15-03-2018 - add "copy" revision
1350
        with new_revision(
1351
            session=self._session,
1352
            tm=transaction.manager,
1353
            content=content,
1354
            force_create_new_revision=True
1355
        ) as rev:
1356
            rev.parent = parent
1357
            rev.workspace = workspace
1358
            rev.label = label
1359
            rev.revision_type = ActionDescription.COPY
1360
            rev.properties['origin'] = {
1361
                'content': item.id,
1362
                'revision': item.last_revision.revision_id,
1363
            }
1364
        if do_save:
1365
            self.save(content, ActionDescription.COPY, do_notify=do_notify)
1366
        return content
1367
1368
    def copy_children(self, origin_content: Content, new_content: Content):
1369
        for child in origin_content.children:
1370
            self.copy(child, new_content)
1371
1372
    def move_recursively(self, item: Content,
1373
                         new_parent: Content, new_workspace: Workspace):
1374
        self.move(item, new_parent, False, new_workspace)
1375
        self.save(item, do_notify=False)
1376
1377
        for child in item.children:
1378
            with new_revision(
1379
                session=self._session,
1380
                tm=transaction.manager,
1381
                content=child
1382
            ):
1383
                self.move_recursively(child, item, new_workspace)
1384
        return
1385
1386
    def update_content(self, item: Content, new_label: str, new_content: str=None) -> Content:
1387
        if item.label == new_label and item.description == new_content:
1388
            # TODO - G.M - 20-03-2018 - Fix internatization for webdav access.
1389
            # Internatization disabled in libcontent for now.
1390
            raise SameValueError('The content did not changed')
1391
        if not new_label:
1392
            raise EmptyLabelNotAllowed()
1393
1394
        self._is_content_label_free_or_raise(
1395
            new_label,
1396
            item.workspace,
1397
            item.parent,
1398
            exclude_content_id=item.content_id
1399
        )
1400
1401
        item.owner = self._user
1402
        item.label = new_label
1403
        item.description = new_content if new_content else item.description # TODO: convert urls into links
1404
        item.revision_type = ActionDescription.EDITION
1405
        return item
1406
1407
    def update_file_data(self, item: Content, new_filename: str, new_mimetype: str, new_content: bytes) -> Content:
1408
        if new_mimetype == item.file_mimetype and \
1409
                new_content == item.depot_file.file.read():
1410
            raise SameValueError('The content did not changed')
1411
        item.owner = self._user
1412
        item.file_name = new_filename
1413
        item.file_mimetype = new_mimetype
1414
        item.depot_file = FileIntent(
1415
            new_content,
1416
            new_filename,
1417
            new_mimetype,
1418
        )
1419
        item.revision_type = ActionDescription.REVISION
1420
        return item
1421
1422
    def archive(self, content: Content):
1423
        content.owner = self._user
1424
        content.is_archived = True
1425
        # TODO - G.M - 12-03-2018 - Inspect possible label conflict problem
1426
        # INFO - G.M - 12-03-2018 - Set label name to avoid trouble when
1427
        # un-archiving file.
1428
        content.label = '{label}-{action}-{date}'.format(
1429
            label=content.label,
1430
            action='archived',
1431
            date=current_date_for_filename()
1432
        )
1433
        content.revision_type = ActionDescription.ARCHIVING
1434
1435
    def unarchive(self, content: Content):
1436
        content.owner = self._user
1437
        content.is_archived = False
1438
        content.revision_type = ActionDescription.UNARCHIVING
1439
1440
    def delete(self, content: Content):
1441
        content.owner = self._user
1442
        content.is_deleted = True
1443
        # TODO - G.M - 12-03-2018 - Inspect possible label conflict problem
1444
        # INFO - G.M - 12-03-2018 - Set label name to avoid trouble when
1445
        # un-deleting file.
1446
        content.label = '{label}-{action}-{date}'.format(
1447
            label=content.label,
1448
            action='deleted',
1449
            date=current_date_for_filename()
1450
        )
1451
        content.revision_type = ActionDescription.DELETION
1452
1453
    def undelete(self, content: Content):
1454
        content.owner = self._user
1455
        content.is_deleted = False
1456
        content.revision_type = ActionDescription.UNDELETION
1457
1458
    def get_preview_page_nb(self, revision_id: int) -> typing.Optional[int]:
1459
        file_path = self.get_one_revision_filepath(revision_id)
1460
        try:
1461
            nb_pages = self.preview_manager.get_page_nb(file_path)
1462
        except UnsupportedMimeType:
1463
            return None
1464
        return nb_pages
1465
1466
    def has_pdf_preview(self, revision_id: int) -> bool:
1467
        file_path = self.get_one_revision_filepath(revision_id)
1468
        try:
1469
            has_pdf_preview = self.preview_manager.has_pdf_preview(file_path)
1470
        except UnsupportedMimeType:
1471
            has_pdf_preview = False
1472
        return has_pdf_preview
1473
1474
    def mark_read__all(
1475
            self,
1476
            read_datetime: datetime=None,
1477
            do_flush: bool=True,
1478
            recursive: bool=True
1479
    ) -> None:
1480
        """
1481
        Read content of all workspace visible for the user.
1482
        :param read_datetime: date of readigin
1483
        :param do_flush: flush database
1484
        :param recursive: mark read subcontent too
1485
        :return: nothing
1486
        """
1487
        return self.mark_read__workspace(None, read_datetime, do_flush, recursive) # nopep8
1488
1489
    def mark_read__workspace(self,
1490
                       workspace : Workspace,
1491
                       read_datetime: datetime=None,
1492
                       do_flush: bool=True,
1493
                       recursive: bool=True
1494
    ) -> None:
1495
        """
1496
        Read content of a workspace visible for the user.
1497
        :param read_datetime: date of readigin
1498
        :param do_flush: flush database
1499
        :param recursive: mark read subcontent too
1500
        :return: nothing
1501
        """
1502
        itemset = self.get_last_active(workspace)
1503
        for item in itemset:
1504
            if item.has_new_information_for(self._user):
1505
                self.mark_read(item, read_datetime, do_flush, recursive)
1506
1507
    def mark_read(
1508
            self,
1509
            content: Content,
1510
            read_datetime: datetime=None,
1511
            do_flush: bool=True,
1512
            recursive: bool=True
1513
    ) -> Content:
1514
        """
1515
        Read content for the user.
1516
        :param read_datetime: date of readigin
1517
        :param do_flush: flush database
1518
        :param recursive: mark read subcontent too
1519
        :return: nothing
1520
        """
1521
        assert self._user
1522
        assert content
1523
1524
        # The algorithm is:
1525
        # 1. define the read datetime
1526
        # 2. update all revisions related to current Content
1527
        # 3. do the same for all child revisions
1528
        #    (ie parent_id is content_id of current content)
1529
1530
        if not read_datetime:
1531
            read_datetime = datetime.datetime.now()
1532
1533
        viewed_revisions = self._session.query(ContentRevisionRO) \
1534
            .filter(ContentRevisionRO.content_id==content.content_id).all()
1535
1536
        for revision in viewed_revisions:
1537
            revision.read_by[self._user] = read_datetime
1538
1539
        if recursive:
1540
            # mark read :
1541
            # - all children
1542
            # - parent stuff (if you mark a comment as read,
1543
            #                 then you have seen the parent)
1544
            # - parent comments
1545
            for child in content.get_valid_children():
1546
                self.mark_read(child, read_datetime=read_datetime,
1547
                               do_flush=False)
1548
1549
            if CONTENT_TYPES.Comment.slug == content.type:
1550
                self.mark_read(content.parent, read_datetime=read_datetime,
1551
                               do_flush=False, recursive=False)
1552
                for comment in content.parent.get_comments():
1553
                    if comment != content:
1554
                        self.mark_read(comment, read_datetime=read_datetime,
1555
                                       do_flush=False, recursive=False)
1556
1557
        if do_flush:
1558
            self.flush()
1559
1560
        return content
1561
1562
    def mark_unread(self, content: Content, do_flush=True) -> Content:
1563
        assert self._user
1564
        assert content
1565
1566
        revisions = self._session.query(ContentRevisionRO) \
1567
            .filter(ContentRevisionRO.content_id==content.content_id).all()
1568
1569
        for revision in revisions:
1570
            try:
1571
                del revision.read_by[self._user]
1572
            except KeyError:
1573
                pass
1574
1575
        for child in content.get_valid_children():
1576
            self.mark_unread(child, do_flush=False)
1577
1578
        if do_flush:
1579
            self.flush()
1580
1581
        return content
1582
1583
    def flush(self):
1584
        self._session.flush()
1585
1586
    def save(self, content: Content, action_description: str=None, do_flush=True, do_notify=True):
1587
        """
1588
        Save an object, flush the session and set the revision_type property
1589
        :param content:
1590
        :param action_description:
1591
        :return:
1592
        """
1593
        assert action_description is None or action_description in ActionDescription.allowed_values()
1594
1595
        if not action_description:
1596
            # See if the last action has been modified
1597
            if content.revision_type==None or len(get_history(content.revision, 'revision_type'))<=0:
1598
                # The action has not been modified, so we set it to default edition
1599
                action_description = ActionDescription.EDITION
1600
1601
        if action_description:
1602
            content.revision_type = action_description
1603
1604
        if do_flush:
1605
            # INFO - 2015-09-03 - D.A.
1606
            # There are 2 flush because of the use
1607
            # of triggers for content creation
1608
            #
1609
            # (when creating a content, actually this is an insert of a new
1610
            # revision in content_revisions ; so the mark_read operation need
1611
            # to get full real data from database before to be prepared.
1612
1613
            self._session.add(content)
1614
            self._session.flush()
1615
1616
            # TODO - 2015-09-03 - D.A. - Do not use triggers
1617
            # We should create a new ContentRevisionRO object instead of Content
1618
            # This would help managing view/not viewed status
1619
            self.mark_read(content, do_flush=True)
1620
1621
        if do_notify:
1622
            self.do_notify(content)
1623
1624
    def do_notify(self, content: Content):
1625
        """
1626
        Allow to force notification for a given content. By default, it is
1627
        called during the .save() operation
1628
        :param content:
1629
        :return:
1630
        """
1631
        NotifierFactory.create(
1632
            config=self._config,
1633
            current_user=self._user,
1634
            session=self._session,
1635
        ).notify_content_update(content)
1636
1637
    def get_keywords(self, search_string, search_string_separators=None) -> [str]:
1638
        """
1639
        :param search_string: a list of coma-separated keywords
1640
        :return: a list of str (each keyword = 1 entry
1641
        """
1642
1643
        search_string_separators = search_string_separators or ContentApi.SEARCH_SEPARATORS
1644
1645
        keywords = []
1646
        if search_string:
1647
            keywords = [keyword.strip() for keyword in re.split(search_string_separators, search_string)]
1648
1649
        return keywords
1650
1651
    def search(self, keywords: [str]) -> Query:
1652
        """
1653
        :return: a sorted list of Content items
1654
        """
1655
1656
        if len(keywords)<=0:
1657
            return None
1658
1659
        filter_group_label = list(Content.label.ilike('%{}%'.format(keyword)) for keyword in keywords)
1660
        filter_group_desc = list(Content.description.ilike('%{}%'.format(keyword)) for keyword in keywords)
1661
        title_keyworded_items = self._hard_filtered_base_query().\
1662
            filter(or_(*(filter_group_label+filter_group_desc))).\
1663
            options(joinedload('children_revisions')).\
1664
            options(joinedload('parent'))
1665
1666
        return title_keyworded_items
1667
1668
    def get_all_types(self) -> typing.List[ContentType]:
1669
        labels = CONTENT_TYPES.endpoint_allowed_types_slug()
1670
        content_types = []
1671
        for label in labels:
1672
            content_types.append(CONTENT_TYPES.get_one_by_slug(label))
1673
1674
        return content_types
1675
1676
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1677
    def exclude_unavailable(
1678
        self,
1679
        contents: typing.List[Content],
1680
    ) -> typing.List[Content]:
1681
        """
1682
        Update and return list with content under archived/deleted removed.
1683
        :param contents: List of contents to parse
1684
        """
1685
        for content in contents[:]:
1686
            if self.content_under_deleted(content) or self.content_under_archived(content):
1687
                contents.remove(content)
1688
        return contents
1689
1690
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1691
    def content_under_deleted(self, content: Content) -> bool:
1692
        if content.parent:
1693
            if content.parent.is_deleted:
1694
                return True
1695
            if content.parent.parent:
1696
                return self.content_under_deleted(content.parent)
1697
        return False
1698
1699
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1700
    def content_under_archived(self, content: Content) -> bool:
1701
        if content.parent:
1702
            if content.parent.is_archived:
1703
                return True
1704
            if content.parent.parent:
1705
                return self.content_under_archived(content.parent)
1706
        return False
1707
1708
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1709
    def find_one_by_unique_property(
1710
            self,
1711
            property_name: str,
1712
            property_value: str,
1713
            workspace: Workspace=None,
1714
    ) -> Content:
1715
        """
1716
        Return Content who contains given property.
1717
        Raise sqlalchemy.orm.exc.MultipleResultsFound if more than one Content
1718
        contains this property value.
1719
        :param property_name: Name of property
1720
        :param property_value: Value of property
1721
        :param workspace: Workspace who contains Content
1722
        :return: Found Content
1723
        """
1724
        # TODO - 20160602 - Bastien: Should be JSON type query
1725
        # see https://www.compose.io/articles/using-json-extensions-in-\
1726
        # postgresql-from-python-2/
1727
        query = self._base_query(workspace=workspace).filter(
1728
            Content._properties.like(
1729
                '%"{property_name}": "{property_value}"%'.format(
1730
                    property_name=property_name,
1731
                    property_value=property_value,
1732
                )
1733
            )
1734
        )
1735
        return query.one()
1736
1737
    # TODO - G.M - 2018-07-24 - [Cleanup] Is this method already needed ?
1738
    def generate_folder_label(
1739
            self,
1740
            workspace: Workspace,
1741
            parent: Content=None,
1742
    ) -> str:
1743
        """
1744
        Generate a folder label
1745
        :param workspace: Future folder workspace
1746
        :param parent: Parent of foture folder (can be None)
1747
        :return: Generated folder name
1748
        """
1749
        _ = self.translator.get_translation
1750
        query = self._base_query(workspace=workspace)\
1751
            .filter(Content.label.ilike('{0}%'.format(
1752
                _('New folder'),
1753
            )))
1754
        if parent:
1755
            query = query.filter(Content.parent == parent)
1756
1757
        return _('New folder {0}').format(
1758
            query.count() + 1,
1759
        )
1760