Passed
Pull Request — develop (#92)
by inkhey
02:00
created

UserApi.get_one()   A

Complexity

Conditions 2

Size

Total Lines 9
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 6
dl 0
loc 9
rs 10
c 0
b 0
f 0
cc 2
nop 2
1
# -*- coding: utf-8 -*-
2
import typing as typing
3
from smtplib import SMTPException
4
5
import transaction
6
from sqlalchemy import or_
7
from sqlalchemy.orm import Query
8
from sqlalchemy.orm import Session
9
from sqlalchemy.orm.exc import NoResultFound
10
11
from tracim_backend.config import CFG
12
from tracim_backend.exceptions import AuthenticationFailed, \
13
    UserCantDeleteHimself, UserCantChangeIsOwnProfile
14
from tracim_backend.exceptions import EmailAlreadyExistInDb
15
from tracim_backend.exceptions import EmailValidationFailed
16
from tracim_backend.exceptions import \
17
    NotificationDisabledCantCreateUserWithInvitation
18
from tracim_backend.exceptions import NotificationDisabledCantResetPassword
19
from tracim_backend.exceptions import NotificationSendingFailed
20
from tracim_backend.exceptions import NoUserSetted
21
from tracim_backend.exceptions import PasswordDoNotMatch
22
from tracim_backend.exceptions import TooShortAutocompleteString
23
from tracim_backend.exceptions import UnvalidResetPasswordToken
24
from tracim_backend.exceptions import UserAuthenticatedIsNotActive
25
from tracim_backend.exceptions import UserDoesNotExist
26
from tracim_backend.exceptions import UserCantDisableHimself
27
from tracim_backend.exceptions import WrongUserPassword
28
from tracim_backend.lib.core.group import GroupApi
29
from tracim_backend.lib.mail_notifier.notifier import get_email_manager
30
from tracim_backend.models.auth import Group
31
from tracim_backend.models.auth import User
32
from tracim_backend.models.context_models import TypeUser
33
from tracim_backend.models.context_models import UserInContext
34
from tracim_backend.models.data import UserRoleInWorkspace
35
36
37
class UserApi(object):
38
39
    def __init__(
40
            self,
41
            current_user: typing.Optional[User],
42
            session: Session,
43
            config: CFG,
44
            show_deleted: bool = False,
45
            show_deactivated: bool = True,
46
    ) -> None:
47
        self._session = session
48
        self._user = current_user
49
        self._config = config
50
        self._show_deleted = show_deleted
51
        self._show_deactivated = show_deactivated
52
53
    def _base_query(self):
54
        query = self._session.query(User)
55
        if not self._show_deleted:
56
            query = query.filter(User.is_deleted == False)
57
        if not self._show_deactivated:
58
            query = query.filter(User.is_active == True)
59
        return query
60
61
    def get_user_with_context(self, user: User) -> UserInContext:
62
        """
63
        Return UserInContext object from User
64
        """
65
        user = UserInContext(
66
            user=user,
67
            dbsession=self._session,
68
            config=self._config,
69
        )
70
        return user
71
72
    # Getters
73
74
    def get_one(self, user_id: int) -> User:
75
        """
76
        Get one user by user id
77
        """
78
        try:
79
            user = self._base_query().filter(User.user_id == user_id).one()
80
        except NoResultFound as exc:
81
            raise UserDoesNotExist('User "{}" not found in database'.format(user_id)) from exc  # nopep8
82
        return user
83
84
    def get_one_by_email(self, email: str) -> User:
85
        """
86
        Get one user by email
87
        :param email: Email of the user
88
        :return: one user
89
        """
90
        try:
91
            user = self._base_query().filter(User.email == email).one()
92
        except NoResultFound as exc:
93
            raise UserDoesNotExist('User "{}" not found in database'.format(email)) from exc  # nopep8
94
        return user
95
96
    def get_one_by_public_name(self, public_name: str) -> User:
97
        """
98
        Get one user by public_name
99
        """
100
        try:
101
            user = self._base_query().filter(User.display_name == public_name).one()
102
        except NoResultFound as exc:
103
            raise UserDoesNotExist('User "{}" not found in database'.format(public_name)) from exc  # nopep8
104
        return user
105
    # FIXME - G.M - 24-04-2018 - Duplicate method with get_one.
106
107
    def get_one_by_id(self, id: int) -> User:
108
        return self.get_one(user_id=id)
109
110
    def get_current_user(self) -> User:
111
        """
112
        Get current_user
113
        """
114
        if not self._user:
115
            raise UserDoesNotExist('There is no current user')
116
        return self._user
117
118
    def _get_all_query(self) -> Query:
119
        return self._session.query(User).order_by(User.display_name)
120
121
    def get_all(self) -> typing.Iterable[User]:
122
        return self._get_all_query().all()
123
124
    def get_known_user(
125
            self,
126
            acp: str,
127
    ) -> typing.Iterable[User]:
128
        """
129
        Return list of know user by current UserApi user.
130
        :param acp: autocomplete filter by name/email
131
        :return: List of found users
132
        """
133
        if len(acp) < 2:
134
            raise TooShortAutocompleteString(
135
                '"{acp}" is a too short string, acp string need to have more than one character'.format(acp=acp)  # nopep8
136
            )
137
        query = self._base_query().order_by(User.display_name)
138
        query = query.filter(or_(User.display_name.ilike('%{}%'.format(acp)), User.email.ilike('%{}%'.format(acp))))  # nopep8
139
140
        # INFO - G.M - 2018-07-27 - if user is set and is simple user, we
141
        # should show only user in same workspace as user
142
        if self._user and self._user.profile.id <= Group.TIM_USER:
143
            user_workspaces_id_query = self._session.\
144
                query(UserRoleInWorkspace.workspace_id).\
145
                distinct(UserRoleInWorkspace.workspace_id).\
146
                filter(UserRoleInWorkspace.user_id == self._user.user_id)
147
            users_in_workspaces = self._session.\
148
                query(UserRoleInWorkspace.user_id).\
149
                distinct(UserRoleInWorkspace.user_id).\
150
                filter(UserRoleInWorkspace.workspace_id.in_(user_workspaces_id_query.subquery())).subquery()  # nopep8
151
            query = query.filter(User.user_id.in_(users_in_workspaces))
152
        return query.all()
153
154
    def find(
155
            self,
156
            user_id: int=None,
157
            email: str=None,
158
            public_name: str=None
159
    ) -> typing.Tuple[TypeUser, User]:
160
        """
161
        Find existing user from all theses params.
162
        Check is made in this order: user_id, email, public_name
163
        If no user found raise UserDoesNotExist exception
164
        """
165
        user = None
166
167
        if user_id:
168
            try:
169
                user = self.get_one(user_id)
170
                return TypeUser.USER_ID, user
171
            except UserDoesNotExist:
172
                pass
173
        if email:
174
            try:
175
                user = self.get_one_by_email(email)
176
                return TypeUser.EMAIL, user
177
            except UserDoesNotExist:
178
                pass
179
        if public_name:
180
            try:
181
                user = self.get_one_by_public_name(public_name)
182
                return TypeUser.PUBLIC_NAME, user
183
            except UserDoesNotExist:
184
                pass
185
186
        raise UserDoesNotExist('User not found with any of given params.')
187
188
    # Check methods
189
190
    def user_with_email_exists(self, email: str) -> bool:
191
        try:
192
            self.get_one_by_email(email)
193
            return True
194
        # TODO - G.M - 09-04-2018 - Better exception
195
        except:
196
            return False
197
198
    def authenticate_user(self, email: str, password: str) -> User:
199
        """
200
        Authenticate user with email and password, raise AuthenticationFailed
201
        if uncorrect.
202
        :param email: email of the user
203
        :param password: cleartext password of the user
204
        :return: User who was authenticated.
205
        """
206
        try:
207
            user = self.get_one_by_email(email)
208
            if not user.is_active:
209
                raise UserAuthenticatedIsNotActive('User "{}" is not active'.format(email))
210
            if user.validate_password(password):
211
                return user
212
            else:
213
                raise WrongUserPassword('User "{}" password is incorrect'.format(email))  # nopep8
214
        except (WrongUserPassword, UserDoesNotExist) as exc:
215
            raise AuthenticationFailed('User "{}" authentication failed'.format(email)) from exc  # nopep8
216
217
    # Actions
218
    def set_password(
219
            self,
220
            user: User,
221
            loggedin_user_password: str,
222
            new_password: str,
223
            new_password2: str,
224
            do_save: bool=True
225
    ):
226
        """
227
        Set User password if logged-in user password is correct
228
        and both new_password are the same.
229
        :param user: User who need password changed
230
        :param loggedin_user_password: cleartext password of logged user (not
231
        same as user)
232
        :param new_password: new password for user
233
        :param new_password2: should be same as new_password
234
        :param do_save: should we save new user password ?
235
        :return:
236
        """
237
        if not self._user:
238
            raise NoUserSetted('Current User should be set in UserApi to use this method')  # nopep8
239
        if not self._user.validate_password(loggedin_user_password):  # nopep8
240
            raise WrongUserPassword(
241
                'Wrong password for authenticated user {}'. format(self._user.user_id)  # nopep8
242
            )
243
        if new_password != new_password2:
244
            raise PasswordDoNotMatch('Passwords given are different')
245
246
        self.update(
247
            user=user,
248
            password=new_password,
249
            do_save=do_save,
250
        )
251
        if do_save:
252
            # TODO - G.M - 2018-07-24 - Check why commit is needed here
253
            self.save(user)
254
        return user
255
256
    def set_email(
257
            self,
258
            user: User,
259
            loggedin_user_password: str,
260
            email: str,
261
            do_save: bool = True
262
    ):
263
        """
264
        Set email address of user if loggedin user password is correct
265
        :param user: User who need email changed
266
        :param loggedin_user_password: cleartext password of logged user (not
267
        same as user)
268
        :param email:
269
        :param do_save:
270
        :return:
271
        """
272
        if not self._user:
273
            raise NoUserSetted('Current User should be set in UserApi to use this method')  # nopep8
274
        if not self._user.validate_password(loggedin_user_password):  # nopep8
275
            raise WrongUserPassword(
276
                'Wrong password for authenticated user {}'. format(self._user.user_id)  # nopep8
277
            )
278
        self.update(
279
            user=user,
280
            email=email,
281
            do_save=do_save,
282
        )
283
        return user
284
285
    def set_password_reset_token(
286
            self,
287
            user: User,
288
            new_password: str,
289
            new_password2: str,
290
            reset_token: str,
291
            do_save: bool = False,
292
    ):
293
        self.validate_reset_password_token(user, reset_token)
294
        if new_password != new_password2:
295
            raise PasswordDoNotMatch('Passwords given are different')
296
297
        self.update(
298
            user=user,
299
            password=new_password,
300
            do_save=do_save,
301
        )
302
        user.reset_tokens()
303
        if do_save:
304
            self.save(user)
305
        return user
306
307
    def _check_email(self, email: str) -> bool:
308
        """
309
        Check if email is completely ok to be used in user db table
310
        """
311
        is_email_correct = self._check_email_correctness(email)
312
        if not is_email_correct:
313
            raise EmailValidationFailed(
314
                'Email given form {} is uncorrect'.format(email))  # nopep8
315
        email_already_exist_in_db = self.check_email_already_in_db(email)
316
        if email_already_exist_in_db:
317
            raise EmailAlreadyExistInDb(
318
                'Email given {} already exist, please choose something else'.format(email)  # nopep8
319
            )
320
        return True
321
322
    def check_email_already_in_db(self, email: str) -> bool:
323
        """
324
        Verify if given email does not already exist in db
325
        """
326
        return self._session.query(User.email).filter(User.email==email).count() != 0  # nopep8
327
328
    def _check_email_correctness(self, email: str) -> bool:
329
        """
330
           Verify if given email is correct:
331
           - check format
332
           - futur active check for email ? (dns based ?)
333
           """
334
        # TODO - G.M - 2018-07-05 - find a better way to check email
335
        if not email:
336
            return False
337
        email = email.split('@')
338
        if len(email) != 2:
339
            return False
340
        return True
341
342
    def update(
343
            self,
344
            user: User,
345
            name: str=None,
346
            email: str=None,
347
            password: str=None,
348
            timezone: str=None,
349
            lang: str=None,
350
            groups: typing.Optional[typing.List[Group]]=None,
351
            do_save=True,
352
    ) -> User:
353
        if name is not None:
354
            user.display_name = name
355
356
        if email is not None and email != user.email:
357
            self._check_email(email)
358
            user.email = email
359
360
        if password is not None:
361
            user.password = password
362
363
        if timezone is not None:
364
            user.timezone = timezone
365
366
        if lang is not None:
367
            user.lang = lang
368
369
        if groups is not None:
370
            if self._user and self._user == user:
371
                raise UserCantChangeIsOwnProfile(
372
                    "User {} can't change is own profile".format(user.user_id)
373
                )
374
            # INFO - G.M - 2018-07-18 - Delete old groups
375
            for group in user.groups:
376
                if group not in groups:
377
                    user.groups.remove(group)
378
            # INFO - G.M - 2018-07-18 - add new groups
379
            for group in groups:
380
                if group not in user.groups:
381
                    user.groups.append(group)
382
383
        if do_save:
384
            self.save(user)
385
386
        return user
387
388
    def create_user(
389
        self,
390
        email,
391
        password: str = None,
392
        name: str = None,
393
        timezone: str = '',
394
        lang: str= None,
395
        groups=[],
396
        do_save: bool=True,
397
        do_notify: bool=True,
398
    ) -> User:
399
        if do_notify and not self._config.EMAIL_NOTIFICATION_ACTIVATED:
400
            raise NotificationDisabledCantCreateUserWithInvitation(
401
                "Can't create user with invitation mail because "
402
                "notification are disabled."
403
            )
404
        new_user = self.create_minimal_user(email, groups, save_now=False)
405
        self.update(
406
            user=new_user,
407
            name=name,
408
            email=email,
409
            password=password,
410
            timezone=timezone,
411
            lang=lang,
412
            do_save=False,
413
        )
414
        if do_notify:
415
            try:
416
                email_manager = get_email_manager(self._config, self._session)
417
                email_manager.notify_created_account(
418
                    new_user,
419
                    password=password
420
                )
421
            except SMTPException as exc:
422
                raise NotificationSendingFailed(
423
                    "Notification for new created account can't be send "
424
                    "(SMTP error), new account creation aborted"
425
                ) from exc
426
        if do_save:
427
            self.save(new_user)
428
        return new_user
429
430
    def create_minimal_user(
431
            self,
432
            email,
433
            groups=[],
434
            save_now=False
435
    ) -> User:
436
        """Previous create_user method"""
437
        self._check_email(email)
438
        user = User()
439
        user.email = email
440
        user.display_name = email.split('@')[0]
441
442
        if not groups:
443
            gapi = GroupApi(
444
                current_user=self._user,  # User
445
                session=self._session,
446
                config=self._config,
447
            )
448
            groups = [gapi.get_one(Group.TIM_USER)]
449
        for group in groups:
450
            user.groups.append(group)
451
452
        self._session.add(user)
453
454
        if save_now:
455
            self._session.flush()
456
457
        return user
458
459
    def reset_password_notification(self, user: User, do_save: bool=False) -> str:  # nopep8
460
        """
461
        Reset password notification
462
        :param user: User who want is password resetted
463
        :param do_save: save update ?
464
        :return: reset_password_token
465
        """
466
        if not self._config.EMAIL_NOTIFICATION_ACTIVATED:
467
            raise NotificationDisabledCantResetPassword("cant reset password with notification disabled")  # nopep8
468
        token = user.generate_reset_password_token()
469
        try:
470
            email_manager = get_email_manager(self._config, self._session)
471
            email_manager.notify_reset_password(user, token)
472
        except SMTPException as exc:
473
            raise NotificationSendingFailed("SMTP error, can't send notification") from exc
474
        if do_save:
475
            self.save(user)
476
        return token
477
478
    def validate_reset_password_token(self, user: User, token: str) -> bool:
479
        return user.validate_reset_password_token(
480
            token=token,
481
            validity_seconds=self._config.USER_RESET_PASSWORD_TOKEN_VALIDITY,
482
        )
483
484
    def enable(self, user: User, do_save=False):
485
        user.is_active = True
486
        if do_save:
487
            self.save(user)
488
489
    def disable(self, user: User, do_save=False):
490
        if self._user and self._user == user:
491
            raise UserCantDisableHimself(
492
                "User {} can't disable himself".format(user.user_id)
493
            )
494
495
        user.is_active = False
496
        if do_save:
497
            self.save(user)
498
499
    def delete(self, user: User, do_save=False):
500
        if self._user and self._user == user:
501
            raise UserCantDeleteHimself(
502
                "User {} can't delete himself".format(user.user_id)
503
            )
504
        user.is_deleted = True
505
        if do_save:
506
            self.save(user)
507
508
    def undelete(self, user: User, do_save=False):
509
        user.is_deleted = False
510
        if do_save:
511
            self.save(user)
512
513
    def save(self, user: User):
514
        self._session.flush()
515
516
    def execute_created_user_actions(self, created_user: User) -> None:
517
        """
518
        Execute actions when user just been created
519
        :return:
520
        """
521
        # NOTE: Cyclic import
522
        # TODO - G.M - 28-03-2018 - [Calendar] Reenable Calendar stuff
523
        #from tracim.lib.calendar import CalendarManager
524
        #from tracim.model.organisational import UserCalendar
525
526
        # TODO - G.M - 04-04-2018 - [auth]
527
        # Check if this is already needed with
528
        # new auth system
529
        created_user.ensure_auth_token(
530
            validity_seconds=self._config.USER_AUTH_TOKEN_VALIDITY
531
        )
532
533
        # Ensure database is up-to-date
534
        self._session.flush()
535
        transaction.commit()
536
537
        # TODO - G.M - 28-03-2018 - [Calendar] Reenable Calendar stuff
538
        # calendar_manager = CalendarManager(created_user)
539
        # calendar_manager.create_then_remove_fake_event(
540
        #     calendar_class=UserCalendar,
541
        #     related_object_id=created_user.user_id,
542
        # )
543