Completed
Pull Request — develop (#31)
by inkhey
03:11
created

TracimRequest._get_current_workspace()   B

Complexity

Conditions 6

Size

Total Lines 33
Code Lines 25

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 25
dl 0
loc 33
rs 8.3466
c 0
b 0
f 0
cc 6
nop 3
1
# -*- coding: utf-8 -*-
2
from pyramid.request import Request
3
from sqlalchemy.orm.exc import NoResultFound
4
5
from tracim_backend.exceptions import NotAuthenticated
6
from tracim_backend.exceptions import UserNotActive
7
from tracim_backend.exceptions import ContentNotFound
8
from tracim_backend.exceptions import InvalidUserId
9
from tracim_backend.exceptions import InvalidWorkspaceId
10
from tracim_backend.exceptions import InvalidContentId
11
from tracim_backend.exceptions import InvalidCommentId
12
from tracim_backend.exceptions import ContentNotFoundInTracimRequest
13
from tracim_backend.exceptions import WorkspaceNotFoundInTracimRequest
14
from tracim_backend.exceptions import UserNotFoundInTracimRequest
15
from tracim_backend.exceptions import UserDoesNotExist
16
from tracim_backend.exceptions import WorkspaceNotFound
17
from tracim_backend.exceptions import ImmutableAttribute
18
from tracim_backend.app_models.contents import CONTENT_TYPES
19
from tracim_backend.lib.core.content import ContentApi
20
from tracim_backend.lib.core.user import UserApi
21
from tracim_backend.lib.core.workspace import WorkspaceApi
22
from tracim_backend.lib.utils.authorization import JSONDecodeError
23
24
from tracim_backend.models import User
25
from tracim_backend.models.data import Workspace
26
from tracim_backend.models.data import Content
27
28
29
class TracimRequest(Request):
30
    """
31
    Request with tracim specific params/methods
32
    """
33
    def __init__(
34
            self,
35
            environ,
36
            charset=None,
37
            unicode_errors=None,
38
            decode_param_names=None,
39
            **kw
40
    ):
41
        super().__init__(
42
            environ,
43
            charset,
44
            unicode_errors,
45
            decode_param_names,
46
            **kw
47
        )
48
        # Current comment, found in request path
49
        self._current_comment = None  # type: Content
50
51
        # Current content, found in request path
52
        self._current_content = None  # type: Content
53
54
        # Current workspace, found in request path
55
        self._current_workspace = None  # type: Workspace
56
57
        # Candidate workspace found in request body
58
        self._candidate_workspace = None  # type: Workspace
59
60
        # Authenticated user
61
        self._current_user = None  # type: User
62
63
        # User found from request headers, content, distinct from authenticated
64
        # user
65
        self._candidate_user = None  # type: User
66
67
        # INFO - G.M - 18-05-2018 - Close db at the end of the request
68
        self.add_finished_callback(self._cleanup)
69
70
    @property
71
    def current_workspace(self) -> Workspace:
72
        """
73
        Get current workspace of the request according to authentification and
74
        request headers (to retrieve workspace). Setted by default value the
75
        first time if not configured.
76
        :return: Workspace of the request
77
        """
78
        if self._current_workspace is None:
79
            self._current_workspace = self._get_current_workspace(self.current_user, self)   # nopep8
80
        return self._current_workspace
81
82
    @current_workspace.setter
83
    def current_workspace(self, workspace: Workspace) -> None:
84
        """
85
        Setting current_workspace
86
        :param workspace:
87
        :return:
88
        """
89
        if self._current_workspace is not None:
90
            raise ImmutableAttribute(
91
                "Can't modify already setted current_workspace"
92
            )
93
        self._current_workspace = workspace
94
95
    @property
96
    def current_user(self) -> User:
97
        """
98
        Get user from authentication mecanism.
99
        """
100
        if self._current_user is None:
101
            self.current_user = self._get_auth_safe_user(self)
102
        return self._current_user
103
104
    @current_user.setter
105
    def current_user(self, user: User) -> None:
106
        if self._current_user is not None:
107
            raise ImmutableAttribute(
108
                "Can't modify already setted current_user"
109
            )
110
        self._current_user = user
111
112
    @property
113
    def current_content(self) -> Content:
114
        """
115
        Get current  content from path
116
        """
117
        if self._current_content is None:
118
            self._current_content = self._get_current_content(
119
                self.current_user,
120
                self.current_workspace,
121
                self
122
                )
123
        return self._current_content
124
125
    @current_content.setter
126
    def current_content(self, content: Content) -> None:
127
        if self._current_content is not None:
128
            raise ImmutableAttribute(
129
                "Can't modify already setted current_content"
130
            )
131
        self._current_content = content
132
133
    @property
134
    def current_comment(self) -> Content:
135
        """
136
        Get current comment from path
137
        """
138
        if self._current_comment is None:
139
            self._current_comment = self._get_current_comment(
140
                self.current_user,
141
                self.current_workspace,
142
                self.current_content,
143
                self
144
                )
145
        return self._current_comment
146
147
    @current_comment.setter
148
    def current_comment(self, content: Content) -> None:
149
        if self._current_comment is not None:
150
            raise ImmutableAttribute(
151
                "Can't modify already setted current_content"
152
            )
153
        self._current_comment = content
154
    # TODO - G.M - 24-05-2018 - Find a better naming for this ?
155
156
    @property
157
    def candidate_user(self) -> User:
158
        """
159
        Get user from headers/body request. This user is not
160
        the one found by authentication mecanism. This user
161
        can help user to know about who one page is about in
162
        a similar way as current_workspace.
163
        """
164
        if self._candidate_user is None:
165
            self.candidate_user = self._get_candidate_user(self)
166
        return self._candidate_user
167
168
    @property
169
    def candidate_workspace(self) -> Workspace:
170
        """
171
        Get workspace from headers/body request. This workspace is not
172
        the one found from path. Its the one from json body.
173
        """
174
        if self._candidate_workspace is None:
175
            self._candidate_workspace = self._get_candidate_workspace(
176
                self.current_user,
177
                self
178
            )
179
        return self._candidate_workspace
180
181
    def _cleanup(self, request: 'TracimRequest') -> None:
182
        """
183
        Close dbsession at the end of the request in order to avoid exception
184
        about not properly closed session or "object created in another thread"
185
        issue
186
        see https://github.com/tracim/tracim_backend/issues/62
187
        :param request: same as self, request
188
        :return: nothing.
189
        """
190
        self._current_user = None
191
        self._current_workspace = None
192
        self.dbsession.close()
193
194
    @candidate_user.setter
195
    def candidate_user(self, user: User) -> None:
196
        if self._candidate_user is not None:
197
            raise ImmutableAttribute(
198
                "Can't modify already setted candidate_user"
199
            )
200
        self._candidate_user = user
201
202
    ###
203
    # Utils for TracimRequest
204
    ###
205
    def _get_current_comment(
206
            self,
207
            user: User,
208
            workspace: Workspace,
209
            content: Content,
210
            request: 'TracimRequest'
211
    ) -> Content:
212
        """
213
        Get current content from request
214
        :param user: User who want to check the workspace
215
        :param workspace: Workspace of the content
216
        :param content: comment is related to this content
217
        :param request: pyramid request
218
        :return: current content
219
        """
220
        comment_id = ''
221
        try:
222
            if 'comment_id' in request.matchdict:
223
                comment_id_str = request.matchdict['content_id']
224
                if not isinstance(comment_id_str, str) or not comment_id_str.isdecimal():  # nopep8
225
                    raise InvalidCommentId('comment_id is not a correct integer')  # nopep8
226
                comment_id = int(request.matchdict['comment_id'])
227
            if not comment_id:
228
                raise ContentNotFoundInTracimRequest('No comment_id property found in request')  # nopep8
229
            api = ContentApi(
230
                current_user=user,
231
                session=request.dbsession,
232
                show_deleted=True,
233
                show_archived=True,
234
                config=request.registry.settings['CFG']
235
            )
236
            comment = api.get_one(
237
                comment_id,
238
                content_type=CONTENT_TYPES.Comment.slug,
239
                workspace=workspace,
240
                parent=content,
241
            )
242
        except NoResultFound as exc:
243
            raise ContentNotFound(
244
                'Comment {} does not exist '
245
                'or is not visible for this user'.format(comment_id)
246
            ) from exc
247
        return comment
248
249
    def _get_current_content(
250
            self,
251
            user: User,
252
            workspace: Workspace,
253
            request: 'TracimRequest'
254
    ) -> Content:
255
        """
256
        Get current content from request
257
        :param user: User who want to check the workspace
258
        :param workspace: Workspace of the content
259
        :param request: pyramid request
260
        :return: current content
261
        """
262
        content_id = ''
263
        try:
264
            if 'content_id' in request.matchdict:
265
                content_id_str = request.matchdict['content_id']
266
                if not isinstance(content_id_str, str) or not content_id_str.isdecimal():  # nopep8
267
                    raise InvalidContentId('content_id is not a correct integer')  # nopep8
268
                content_id = int(request.matchdict['content_id'])
269
            if not content_id:
270
                raise ContentNotFoundInTracimRequest('No content_id property found in request')  # nopep8
271
            api = ContentApi(
272
                current_user=user,
273
                show_deleted=True,
274
                show_archived=True,
275
                session=request.dbsession,
276
                config=request.registry.settings['CFG']
277
            )
278
            content = api.get_one(content_id=content_id, workspace=workspace, content_type=CONTENT_TYPES.Any_SLUG)  # nopep8
279
        except NoResultFound as exc:
280
            raise ContentNotFound(
281
                'Content {} does not exist '
282
                'or is not visible for this user'.format(content_id)
283
            ) from exc
284
        return content
285
286
    def _get_candidate_user(
287
            self,
288
            request: 'TracimRequest',
289
    ) -> User:
290
        """
291
        Get candidate user
292
        :param request: pyramid request
293
        :return: user found from header/body
294
        """
295
        app_config = request.registry.settings['CFG']
296
        uapi = UserApi(None, show_deleted=True, session=request.dbsession, config=app_config)
297
        login = ''
298
        try:
299
            login = None
300
            if 'user_id' in request.matchdict:
301
                user_id_str = request.matchdict['user_id']
302
                if not isinstance(user_id_str, str) or not user_id_str.isdecimal():
303
                    raise InvalidUserId('user_id is not a correct integer')  # nopep8
304
                login = int(request.matchdict['user_id'])
305
            if not login:
306
                raise UserNotFoundInTracimRequest('You request a candidate user but the context not permit to found one')  # nopep8
307
            user = uapi.get_one(login)
308
        except UserNotFoundInTracimRequest as exc:
309
            raise UserDoesNotExist('User {} not found'.format(login)) from exc
310
        return user
311
312
    def _get_auth_safe_user(
313
            self,
314
            request: 'TracimRequest',
315
    ) -> User:
316
        """
317
        Get current pyramid authenticated user from request
318
        :param request: pyramid request
319
        :return: current authenticated user
320
        """
321
        app_config = request.registry.settings['CFG']
322
        uapi = UserApi(None, session=request.dbsession, config=app_config)
323
        login = ''
324
        try:
325
            login = request.authenticated_userid
326
            if not login:
327
                raise UserNotFoundInTracimRequest('You request a current user but the context not permit to found one')  # nopep8
328
            user = uapi.get_one_by_email(login)
329
            if not user.is_active:
330
                raise UserNotActive('User {} is not active'.format(login))
331
        except (UserDoesNotExist, UserNotFoundInTracimRequest) as exc:
332
            raise NotAuthenticated('User {} not found'.format(login)) from exc
333
        return user
334
335
    def _get_current_workspace(
336
            self,
337
            user: User,
338
            request: 'TracimRequest'
339
    ) -> Workspace:
340
        """
341
        Get current workspace from request
342
        :param user: User who want to check the workspace
343
        :param request: pyramid request
344
        :return: current workspace
345
        """
346
        workspace_id = ''
347
        try:
348
            if 'workspace_id' in request.matchdict:
349
                workspace_id_str = request.matchdict['workspace_id']
350
                if not isinstance(workspace_id_str, str) or not workspace_id_str.isdecimal():  # nopep8
351
                    raise InvalidWorkspaceId('workspace_id is not a correct integer')  # nopep8
352
                workspace_id = int(request.matchdict['workspace_id'])
353
            if not workspace_id:
354
                raise WorkspaceNotFoundInTracimRequest('No workspace_id property found in request')  # nopep8
355
            wapi = WorkspaceApi(
356
                current_user=user,
357
                session=request.dbsession,
358
                config=request.registry.settings['CFG'],
359
                show_deleted=True,
360
            )
361
            workspace = wapi.get_one(workspace_id)
362
        except NoResultFound as exc:
363
            raise WorkspaceNotFound(
364
                'Workspace {} does not exist '
365
                'or is not visible for this user'.format(workspace_id)
366
            ) from exc
367
        return workspace
368
369
    def _get_candidate_workspace(
370
            self,
371
            user: User,
372
            request: 'TracimRequest'
373
    ) -> Workspace:
374
        """
375
        Get current workspace from request
376
        :param user: User who want to check the workspace
377
        :param request: pyramid request
378
        :return: current workspace
379
        """
380
        workspace_id = ''
381
        try:
382
            if 'new_workspace_id' in request.json_body:
383
                workspace_id = request.json_body['new_workspace_id']
384
                if not isinstance(workspace_id, int):
385
                    if workspace_id.isdecimal():
386
                        workspace_id = int(workspace_id)
387
                    else:
388
                        raise InvalidWorkspaceId('workspace_id is not a correct integer')  # nopep8
389
            if not workspace_id:
390
                raise WorkspaceNotFoundInTracimRequest('No new_workspace_id property found in body')  # nopep8
391
            wapi = WorkspaceApi(
392
                current_user=user,
393
                session=request.dbsession,
394
                config=request.registry.settings['CFG'],
395
                show_deleted=True,
396
            )
397
            workspace = wapi.get_one(workspace_id)
398
        except JSONDecodeError as exc:
399
            raise WorkspaceNotFound('Invalid JSON content') from exc
400
        except NoResultFound as exc:
401
            raise WorkspaceNotFound(
402
                'Workspace {} does not exist '
403
                'or is not visible for this user'.format(workspace_id)
404
            ) from exc
405
        return workspace
406