Passed
Pull Request — develop (#42)
by inkhey
02:42
created

TracimRequest.current_content()   A

Complexity

Conditions 2

Size

Total Lines 12
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 6
dl 0
loc 12
rs 10
c 0
b 0
f 0
cc 2
nop 2
1
# -*- coding: utf-8 -*-
2
from pyramid.request import Request
3
from pyramid.response import Response
4
from sqlalchemy.orm.exc import NoResultFound
5
6
from tracim_backend.exceptions import NotAuthenticated
7
from tracim_backend.exceptions import UserNotActive
8
from tracim_backend.exceptions import ContentNotFound
9
from tracim_backend.exceptions import InvalidUserId
10
from tracim_backend.exceptions import InvalidWorkspaceId
11
from tracim_backend.exceptions import InvalidContentId
12
from tracim_backend.exceptions import InvalidCommentId
13
from tracim_backend.exceptions import ContentNotFoundInTracimRequest
14
from tracim_backend.exceptions import WorkspaceNotFoundInTracimRequest
15
from tracim_backend.exceptions import UserNotFoundInTracimRequest
16
from tracim_backend.exceptions import UserDoesNotExist
17
from tracim_backend.exceptions import WorkspaceNotFound
18
from tracim_backend.exceptions import ImmutableAttribute
19
from tracim_backend.app_models.contents import CONTENT_TYPES
20
from tracim_backend.lib.core.content import ContentApi
21
from tracim_backend.lib.core.user import UserApi
22
from tracim_backend.lib.core.workspace import WorkspaceApi
23
from tracim_backend.lib.utils.authorization import JSONDecodeError
24
25
from tracim_backend.models import User
26
from tracim_backend.models.data import Workspace
27
from tracim_backend.models.data import Content
28
29
30
class TracimRequest(Request):
31
    """
32
    Request with tracim specific params/methods
33
    """
34
    def __init__(
35
            self,
36
            environ,
37
            charset=None,
38
            unicode_errors=None,
39
            decode_param_names=None,
40
            **kw
41
    ):
42
        super().__init__(
43
            environ,
44
            charset,
45
            unicode_errors,
46
            decode_param_names,
47
            **kw
48
        )
49
        # Current comment, found in request path
50
        self._current_comment = None  # type: Content
51
52
        # Current content, found in request path
53
        self._current_content = None  # type: Content
54
55
        # Current workspace, found in request path
56
        self._current_workspace = None  # type: Workspace
57
58
        # Candidate workspace found in request body
59
        self._candidate_workspace = None  # type: Workspace
60
61
        # Authenticated user
62
        self._current_user = None  # type: User
63
64
        # User found from request headers, content, distinct from authenticated
65
        # user
66
        self._candidate_user = None  # type: User
67
68
        self.response_content_disposition = None
69
        # INFO - G.M - 18-05-2018 - Close db at the end of the request
70
        self.add_finished_callback(self._cleanup)
71
        self.add_response_callback(self._set_responses_headers)
72
73
    @property
74
    def current_workspace(self) -> Workspace:
75
        """
76
        Get current workspace of the request according to authentification and
77
        request headers (to retrieve workspace). Setted by default value the
78
        first time if not configured.
79
        :return: Workspace of the request
80
        """
81
        if self._current_workspace is None:
82
            self._current_workspace = self._get_current_workspace(self.current_user, self)   # nopep8
83
        return self._current_workspace
84
85
    @current_workspace.setter
86
    def current_workspace(self, workspace: Workspace) -> None:
87
        """
88
        Setting current_workspace
89
        :param workspace:
90
        :return:
91
        """
92
        if self._current_workspace is not None:
93
            raise ImmutableAttribute(
94
                "Can't modify already setted current_workspace"
95
            )
96
        self._current_workspace = workspace
97
98
    @property
99
    def current_user(self) -> User:
100
        """
101
        Get user from authentication mecanism.
102
        """
103
        if self._current_user is None:
104
            self.current_user = self._get_auth_safe_user(self)
105
        return self._current_user
106
107
    @current_user.setter
108
    def current_user(self, user: User) -> None:
109
        if self._current_user is not None:
110
            raise ImmutableAttribute(
111
                "Can't modify already setted current_user"
112
            )
113
        self._current_user = user
114
115
    @property
116
    def current_content(self) -> Content:
117
        """
118
        Get current  content from path
119
        """
120
        if self._current_content is None:
121
            self._current_content = self._get_current_content(
122
                self.current_user,
123
                self.current_workspace,
124
                self
125
                )
126
        return self._current_content
127
128
    @current_content.setter
129
    def current_content(self, content: Content) -> None:
130
        if self._current_content is not None:
131
            raise ImmutableAttribute(
132
                "Can't modify already setted current_content"
133
            )
134
        self._current_content = content
135
136
    @property
137
    def current_comment(self) -> Content:
138
        """
139
        Get current comment from path
140
        """
141
        if self._current_comment is None:
142
            self._current_comment = self._get_current_comment(
143
                self.current_user,
144
                self.current_workspace,
145
                self.current_content,
146
                self
147
                )
148
        return self._current_comment
149
150
    @current_comment.setter
151
    def current_comment(self, content: Content) -> None:
152
        if self._current_comment is not None:
153
            raise ImmutableAttribute(
154
                "Can't modify already setted current_content"
155
            )
156
        self._current_comment = content
157
    # TODO - G.M - 24-05-2018 - Find a better naming for this ?
158
159
    @property
160
    def candidate_user(self) -> User:
161
        """
162
        Get user from headers/body request. This user is not
163
        the one found by authentication mecanism. This user
164
        can help user to know about who one page is about in
165
        a similar way as current_workspace.
166
        """
167
        if self._candidate_user is None:
168
            self.candidate_user = self._get_candidate_user(self)
169
        return self._candidate_user
170
171
    @property
172
    def candidate_workspace(self) -> Workspace:
173
        """
174
        Get workspace from headers/body request. This workspace is not
175
        the one found from path. Its the one from json body.
176
        """
177
        if self._candidate_workspace is None:
178
            self._candidate_workspace = self._get_candidate_workspace(
179
                self.current_user,
180
                self
181
            )
182
        return self._candidate_workspace
183
184
    def response_download_mode(self, force_download: bool = False) -> None:
185
        if force_download:
186
            self.response_content_disposition = 'attachment'
187
188
    def _cleanup(self, request: 'TracimRequest') -> None:
189
        """
190
        Close dbsession at the end of the request in order to avoid exception
191
        about not properly closed session or "object created in another thread"
192
        issue
193
        see https://github.com/tracim/tracim_backend/issues/62
194
        :param request: same as self, request
195
        :return: nothing.
196
        """
197
        self._current_user = None
198
        self._current_workspace = None
199
        self.dbsession.close()
200
201
    def _set_responses_headers(self, request: 'TracimRequest', response: Response):  # nopep8
202
        if request.response_content_disposition:
203
            response.content_disposition = request.response_content_disposition
204
205
    @candidate_user.setter
206
    def candidate_user(self, user: User) -> None:
207
        if self._candidate_user is not None:
208
            raise ImmutableAttribute(
209
                "Can't modify already setted candidate_user"
210
            )
211
        self._candidate_user = user
212
213
    ###
214
    # Utils for TracimRequest
215
    ###
216
    def _get_current_comment(
217
            self,
218
            user: User,
219
            workspace: Workspace,
220
            content: Content,
221
            request: 'TracimRequest'
222
    ) -> Content:
223
        """
224
        Get current content from request
225
        :param user: User who want to check the workspace
226
        :param workspace: Workspace of the content
227
        :param content: comment is related to this content
228
        :param request: pyramid request
229
        :return: current content
230
        """
231
        comment_id = ''
232
        try:
233
            if 'comment_id' in request.matchdict:
234
                comment_id_str = request.matchdict['content_id']
235
                if not isinstance(comment_id_str, str) or not comment_id_str.isdecimal():  # nopep8
236
                    raise InvalidCommentId('comment_id is not a correct integer')  # nopep8
237
                comment_id = int(request.matchdict['comment_id'])
238
            if not comment_id:
239
                raise ContentNotFoundInTracimRequest('No comment_id property found in request')  # nopep8
240
            api = ContentApi(
241
                current_user=user,
242
                session=request.dbsession,
243
                show_deleted=True,
244
                show_archived=True,
245
                config=request.registry.settings['CFG']
246
            )
247
            comment = api.get_one(
248
                comment_id,
249
                content_type=CONTENT_TYPES.Comment.slug,
250
                workspace=workspace,
251
                parent=content,
252
            )
253
        except NoResultFound as exc:
254
            raise ContentNotFound(
255
                'Comment {} does not exist '
256
                'or is not visible for this user'.format(comment_id)
257
            ) from exc
258
        return comment
259
260
    def _get_current_content(
261
            self,
262
            user: User,
263
            workspace: Workspace,
264
            request: 'TracimRequest'
265
    ) -> Content:
266
        """
267
        Get current content from request
268
        :param user: User who want to check the workspace
269
        :param workspace: Workspace of the content
270
        :param request: pyramid request
271
        :return: current content
272
        """
273
        content_id = ''
274
        try:
275
            if 'content_id' in request.matchdict:
276
                content_id_str = request.matchdict['content_id']
277
                if not isinstance(content_id_str, str) or not content_id_str.isdecimal():  # nopep8
278
                    raise InvalidContentId('content_id is not a correct integer')  # nopep8
279
                content_id = int(request.matchdict['content_id'])
280
            if not content_id:
281
                raise ContentNotFoundInTracimRequest('No content_id property found in request')  # nopep8
282
            api = ContentApi(
283
                current_user=user,
284
                show_deleted=True,
285
                show_archived=True,
286
                session=request.dbsession,
287
                config=request.registry.settings['CFG']
288
            )
289
            content = api.get_one(content_id=content_id, workspace=workspace, content_type=CONTENT_TYPES.Any_SLUG)  # nopep8
290
        except NoResultFound as exc:
291
            raise ContentNotFound(
292
                'Content {} does not exist '
293
                'or is not visible for this user'.format(content_id)
294
            ) from exc
295
        return content
296
297
    def _get_candidate_user(
298
            self,
299
            request: 'TracimRequest',
300
    ) -> User:
301
        """
302
        Get candidate user
303
        :param request: pyramid request
304
        :return: user found from header/body
305
        """
306
        app_config = request.registry.settings['CFG']
307
        uapi = UserApi(None, show_deleted=True, session=request.dbsession, config=app_config)
308
        login = ''
309
        try:
310
            login = None
311
            if 'user_id' in request.matchdict:
312
                user_id_str = request.matchdict['user_id']
313
                if not isinstance(user_id_str, str) or not user_id_str.isdecimal():
314
                    raise InvalidUserId('user_id is not a correct integer')  # nopep8
315
                login = int(request.matchdict['user_id'])
316
            if not login:
317
                raise UserNotFoundInTracimRequest('You request a candidate user but the context not permit to found one')  # nopep8
318
            user = uapi.get_one(login)
319
        except UserNotFoundInTracimRequest as exc:
320
            raise UserDoesNotExist('User {} not found'.format(login)) from exc
321
        return user
322
323
    def _get_auth_safe_user(
324
            self,
325
            request: 'TracimRequest',
326
    ) -> User:
327
        """
328
        Get current pyramid authenticated user from request
329
        :param request: pyramid request
330
        :return: current authenticated user
331
        """
332
        app_config = request.registry.settings['CFG']
333
        uapi = UserApi(None, session=request.dbsession, config=app_config)
334
        login = ''
335
        try:
336
            login = request.authenticated_userid
337
            if not login:
338
                raise UserNotFoundInTracimRequest('You request a current user but the context not permit to found one')  # nopep8
339
            user = uapi.get_one_by_email(login)
340
            if not user.is_active:
341
                raise UserNotActive('User {} is not active'.format(login))
342
        except (UserDoesNotExist, UserNotFoundInTracimRequest) as exc:
343
            raise NotAuthenticated('User {} not found'.format(login)) from exc
344
        return user
345
346
    def _get_current_workspace(
347
            self,
348
            user: User,
349
            request: 'TracimRequest'
350
    ) -> Workspace:
351
        """
352
        Get current workspace from request
353
        :param user: User who want to check the workspace
354
        :param request: pyramid request
355
        :return: current workspace
356
        """
357
        workspace_id = ''
358
        try:
359
            if 'workspace_id' in request.matchdict:
360
                workspace_id_str = request.matchdict['workspace_id']
361
                if not isinstance(workspace_id_str, str) or not workspace_id_str.isdecimal():  # nopep8
362
                    raise InvalidWorkspaceId('workspace_id is not a correct integer')  # nopep8
363
                workspace_id = int(request.matchdict['workspace_id'])
364
            if not workspace_id:
365
                raise WorkspaceNotFoundInTracimRequest('No workspace_id property found in request')  # nopep8
366
            wapi = WorkspaceApi(
367
                current_user=user,
368
                session=request.dbsession,
369
                config=request.registry.settings['CFG'],
370
                show_deleted=True,
371
            )
372
            workspace = wapi.get_one(workspace_id)
373
        except NoResultFound as exc:
374
            raise WorkspaceNotFound(
375
                'Workspace {} does not exist '
376
                'or is not visible for this user'.format(workspace_id)
377
            ) from exc
378
        return workspace
379
380
    def _get_candidate_workspace(
381
            self,
382
            user: User,
383
            request: 'TracimRequest'
384
    ) -> Workspace:
385
        """
386
        Get current workspace from request
387
        :param user: User who want to check the workspace
388
        :param request: pyramid request
389
        :return: current workspace
390
        """
391
        workspace_id = ''
392
        try:
393
            if 'new_workspace_id' in request.json_body:
394
                workspace_id = request.json_body['new_workspace_id']
395
                if not isinstance(workspace_id, int):
396
                    if workspace_id.isdecimal():
397
                        workspace_id = int(workspace_id)
398
                    else:
399
                        raise InvalidWorkspaceId('workspace_id is not a correct integer')  # nopep8
400
            if not workspace_id:
401
                raise WorkspaceNotFoundInTracimRequest('No new_workspace_id property found in body')  # nopep8
402
            wapi = WorkspaceApi(
403
                current_user=user,
404
                session=request.dbsession,
405
                config=request.registry.settings['CFG'],
406
                show_deleted=True,
407
            )
408
            workspace = wapi.get_one(workspace_id)
409
        except JSONDecodeError as exc:
410
            raise WorkspaceNotFound('Invalid JSON content') from exc
411
        except NoResultFound as exc:
412
            raise WorkspaceNotFound(
413
                'Workspace {} does not exist '
414
                'or is not visible for this user'.format(workspace_id)
415
            ) from exc
416
        return workspace
417