Passed
Pull Request — develop (#92)
by inkhey
01:32
created

UserApi.authenticate_user()   A

Complexity

Conditions 4

Size

Total Lines 18
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 10
dl 0
loc 18
rs 9.9
c 0
b 0
f 0
cc 4
nop 3
1
# -*- coding: utf-8 -*-
2
import typing as typing
3
from smtplib import SMTPException
4
5
import transaction
6
from sqlalchemy import or_
7
from sqlalchemy.orm import Query
8
from sqlalchemy.orm import Session
9
from sqlalchemy.orm.exc import NoResultFound
10
11
from tracim_backend.config import CFG
12
from tracim_backend.exceptions import AuthenticationFailed, \
13
    UserCantDeleteHimself, UserCantChangeIsOwnProfile
14
from tracim_backend.exceptions import EmailAlreadyExistInDb
15
from tracim_backend.exceptions import EmailValidationFailed
16
from tracim_backend.exceptions import NotificationDisabled
17
from tracim_backend.exceptions import NotificationSendingFailed
18
from tracim_backend.exceptions import NoUserSetted
19
from tracim_backend.exceptions import PasswordDoNotMatch
20
from tracim_backend.exceptions import TooShortAutocompleteString
21
from tracim_backend.exceptions import UnvalidResetPasswordToken
22
from tracim_backend.exceptions import UserDoesNotExist
23
from tracim_backend.exceptions import UserAuthenticatedIsNotActive
24
from tracim_backend.exceptions import UserCantDisableHimself
25
from tracim_backend.exceptions import WrongUserPassword
26
from tracim_backend.lib.core.group import GroupApi
27
from tracim_backend.lib.mail_notifier.notifier import get_email_manager
28
from tracim_backend.models.auth import Group
29
from tracim_backend.models.auth import User
30
from tracim_backend.models.context_models import TypeUser
31
from tracim_backend.models.context_models import UserInContext
32
from tracim_backend.models.data import UserRoleInWorkspace
33
34
35
class UserApi(object):
36
37
    def __init__(
38
            self,
39
            current_user: typing.Optional[User],
40
            session: Session,
41
            config: CFG,
42
            show_deleted: bool = False,
43
            show_deactivated: bool = True,
44
    ) -> None:
45
        self._session = session
46
        self._user = current_user
47
        self._config = config
48
        self._show_deleted = show_deleted
49
        self._show_deactivated = show_deactivated
50
51
    def _base_query(self):
52
        query = self._session.query(User)
53
        if not self._show_deleted:
54
            query = query.filter(User.is_deleted == False)
55
        if not self._show_deactivated:
56
            query = query.filter(User.is_active == True)
57
        return query
58
59
    def get_user_with_context(self, user: User) -> UserInContext:
60
        """
61
        Return UserInContext object from User
62
        """
63
        user = UserInContext(
64
            user=user,
65
            dbsession=self._session,
66
            config=self._config,
67
        )
68
        return user
69
70
    # Getters
71
72
    def get_one(self, user_id: int) -> User:
73
        """
74
        Get one user by user id
75
        """
76
        try:
77
            user = self._base_query().filter(User.user_id == user_id).one()
78
        except NoResultFound as exc:
79
            raise UserDoesNotExist('User "{}" not found in database'.format(user_id)) from exc  # nopep8
80
        return user
81
82
    def get_one_by_email(self, email: str) -> User:
83
        """
84
        Get one user by email
85
        :param email: Email of the user
86
        :return: one user
87
        """
88
        try:
89
            user = self._base_query().filter(User.email == email).one()
90
        except NoResultFound as exc:
91
            raise UserDoesNotExist('User "{}" not found in database'.format(email)) from exc  # nopep8
92
        return user
93
94
    def get_one_by_public_name(self, public_name: str) -> User:
95
        """
96
        Get one user by public_name
97
        """
98
        try:
99
            user = self._base_query().filter(User.display_name == public_name).one()
100
        except NoResultFound as exc:
101
            raise UserDoesNotExist('User "{}" not found in database'.format(public_name)) from exc  # nopep8
102
        return user
103
    # FIXME - G.M - 24-04-2018 - Duplicate method with get_one.
104
105
    def get_one_by_id(self, id: int) -> User:
106
        return self.get_one(user_id=id)
107
108
    def get_current_user(self) -> User:
109
        """
110
        Get current_user
111
        """
112
        if not self._user:
113
            raise UserDoesNotExist('There is no current user')
114
        return self._user
115
116
    def _get_all_query(self) -> Query:
117
        return self._session.query(User).order_by(User.display_name)
118
119
    def get_all(self) -> typing.Iterable[User]:
120
        return self._get_all_query().all()
121
122
    def get_known_user(
123
            self,
124
            acp: str,
125
    ) -> typing.Iterable[User]:
126
        """
127
        Return list of know user by current UserApi user.
128
        :param acp: autocomplete filter by name/email
129
        :return: List of found users
130
        """
131
        if len(acp) < 2:
132
            raise TooShortAutocompleteString(
133
                '"{acp}" is a too short string, acp string need to have more than one character'.format(acp=acp)  # nopep8
134
            )
135
        query = self._base_query().order_by(User.display_name)
136
        query = query.filter(or_(User.display_name.ilike('%{}%'.format(acp)), User.email.ilike('%{}%'.format(acp))))  # nopep8
137
138
        # INFO - G.M - 2018-07-27 - if user is set and is simple user, we
139
        # should show only user in same workspace as user
140
        if self._user and self._user.profile.id <= Group.TIM_USER:
141
            user_workspaces_id_query = self._session.\
142
                query(UserRoleInWorkspace.workspace_id).\
143
                distinct(UserRoleInWorkspace.workspace_id).\
144
                filter(UserRoleInWorkspace.user_id == self._user.user_id)
145
            users_in_workspaces = self._session.\
146
                query(UserRoleInWorkspace.user_id).\
147
                distinct(UserRoleInWorkspace.user_id).\
148
                filter(UserRoleInWorkspace.workspace_id.in_(user_workspaces_id_query.subquery())).subquery()  # nopep8
149
            query = query.filter(User.user_id.in_(users_in_workspaces))
150
        return query.all()
151
152
    def find(
153
            self,
154
            user_id: int=None,
155
            email: str=None,
156
            public_name: str=None
157
    ) -> typing.Tuple[TypeUser, User]:
158
        """
159
        Find existing user from all theses params.
160
        Check is made in this order: user_id, email, public_name
161
        If no user found raise UserDoesNotExist exception
162
        """
163
        user = None
164
165
        if user_id:
166
            try:
167
                user = self.get_one(user_id)
168
                return TypeUser.USER_ID, user
169
            except UserDoesNotExist:
170
                pass
171
        if email:
172
            try:
173
                user = self.get_one_by_email(email)
174
                return TypeUser.EMAIL, user
175
            except UserDoesNotExist:
176
                pass
177
        if public_name:
178
            try:
179
                user = self.get_one_by_public_name(public_name)
180
                return TypeUser.PUBLIC_NAME, user
181
            except UserDoesNotExist:
182
                pass
183
184
        raise UserDoesNotExist('User not found with any of given params.')
185
186
    # Check methods
187
188
    def user_with_email_exists(self, email: str) -> bool:
189
        try:
190
            self.get_one_by_email(email)
191
            return True
192
        # TODO - G.M - 09-04-2018 - Better exception
193
        except:
194
            return False
195
196
    def authenticate_user(self, email: str, password: str) -> User:
197
        """
198
        Authenticate user with email and password, raise AuthenticationFailed
199
        if uncorrect.
200
        :param email: email of the user
201
        :param password: cleartext password of the user
202
        :return: User who was authenticated.
203
        """
204
        try:
205
            user = self.get_one_by_email(email)
206
            if not user.is_active:
207
                raise UserAuthenticatedIsNotActive('User "{}" is not active'.format(email))
208
            if user.validate_password(password):
209
                return user
210
            else:
211
                raise WrongUserPassword('User "{}" password is incorrect'.format(email))  # nopep8
212
        except (WrongUserPassword, UserDoesNotExist) as exc:
213
            raise AuthenticationFailed('User "{}" authentication failed'.format(email)) from exc  # nopep8
214
215
    # Actions
216
    def set_password(
217
            self,
218
            user: User,
219
            loggedin_user_password: str,
220
            new_password: str,
221
            new_password2: str,
222
            do_save: bool=True
223
    ):
224
        """
225
        Set User password if logged-in user password is correct
226
        and both new_password are the same.
227
        :param user: User who need password changed
228
        :param loggedin_user_password: cleartext password of logged user (not
229
        same as user)
230
        :param new_password: new password for user
231
        :param new_password2: should be same as new_password
232
        :param do_save: should we save new user password ?
233
        :return:
234
        """
235
        if not self._user:
236
            raise NoUserSetted('Current User should be set in UserApi to use this method')  # nopep8
237
        if not self._user.validate_password(loggedin_user_password):  # nopep8
238
            raise WrongUserPassword(
239
                'Wrong password for authenticated user {}'. format(self._user.user_id)  # nopep8
240
            )
241
        if new_password != new_password2:
242
            raise PasswordDoNotMatch('Passwords given are different')
243
244
        self.update(
245
            user=user,
246
            password=new_password,
247
            do_save=do_save,
248
        )
249
        if do_save:
250
            # TODO - G.M - 2018-07-24 - Check why commit is needed here
251
            self.save(user)
252
        return user
253
254
    def set_email(
255
            self,
256
            user: User,
257
            loggedin_user_password: str,
258
            email: str,
259
            do_save: bool = True
260
    ):
261
        """
262
        Set email address of user if loggedin user password is correct
263
        :param user: User who need email changed
264
        :param loggedin_user_password: cleartext password of logged user (not
265
        same as user)
266
        :param email:
267
        :param do_save:
268
        :return:
269
        """
270
        if not self._user:
271
            raise NoUserSetted('Current User should be set in UserApi to use this method')  # nopep8
272
        if not self._user.validate_password(loggedin_user_password):  # nopep8
273
            raise WrongUserPassword(
274
                'Wrong password for authenticated user {}'. format(self._user.user_id)  # nopep8
275
            )
276
        self.update(
277
            user=user,
278
            email=email,
279
            do_save=do_save,
280
        )
281
        return user
282
283
    def set_password_reset_token(
284
            self,
285
            user: User,
286
            new_password: str,
287
            new_password2: str,
288
            reset_token: str,
289
            do_save: bool = False,
290
    ):
291
        self.validate_reset_password_token(user, reset_token)
292
        if new_password != new_password2:
293
            raise PasswordDoNotMatch('Passwords given are different')
294
295
        self.update(
296
            user=user,
297
            password=new_password,
298
            do_save=do_save,
299
        )
300
        user.reset_tokens()
301
        if do_save:
302
            self.save(user)
303
        return user
304
305
    def _check_email(self, email: str) -> bool:
306
        """
307
        Check if email is completely ok to be used in user db table
308
        """
309
        is_email_correct = self._check_email_correctness(email)
310
        if not is_email_correct:
311
            raise EmailValidationFailed(
312
                'Email given form {} is uncorrect'.format(email))  # nopep8
313
        email_already_exist_in_db = self.check_email_already_in_db(email)
314
        if email_already_exist_in_db:
315
            raise EmailAlreadyExistInDb(
316
                'Email given {} already exist, please choose something else'.format(email)  # nopep8
317
            )
318
        return True
319
320
    def check_email_already_in_db(self, email: str) -> bool:
321
        """
322
        Verify if given email does not already exist in db
323
        """
324
        return self._session.query(User.email).filter(User.email==email).count() != 0  # nopep8
325
326
    def _check_email_correctness(self, email: str) -> bool:
327
        """
328
           Verify if given email is correct:
329
           - check format
330
           - futur active check for email ? (dns based ?)
331
           """
332
        # TODO - G.M - 2018-07-05 - find a better way to check email
333
        if not email:
334
            return False
335
        email = email.split('@')
336
        if len(email) != 2:
337
            return False
338
        return True
339
340
    def update(
341
            self,
342
            user: User,
343
            name: str=None,
344
            email: str=None,
345
            password: str=None,
346
            timezone: str=None,
347
            lang: str=None,
348
            groups: typing.Optional[typing.List[Group]]=None,
349
            do_save=True,
350
    ) -> User:
351
        if name is not None:
352
            user.display_name = name
353
354
        if email is not None and email != user.email:
355
            self._check_email(email)
356
            user.email = email
357
358
        if password is not None:
359
            user.password = password
360
361
        if timezone is not None:
362
            user.timezone = timezone
363
364
        if lang is not None:
365
            user.lang = lang
366
367
        if groups is not None:
368
            if self._user and self._user == user:
369
                raise UserCantChangeIsOwnProfile(
370
                    "User {} can't change is own profile".format(user.user_id)
371
                )
372
            # INFO - G.M - 2018-07-18 - Delete old groups
373
            for group in user.groups:
374
                if group not in groups:
375
                    user.groups.remove(group)
376
            # INFO - G.M - 2018-07-18 - add new groups
377
            for group in groups:
378
                if group not in user.groups:
379
                    user.groups.append(group)
380
381
        if do_save:
382
            self.save(user)
383
384
        return user
385
386
    def create_user(
387
        self,
388
        email,
389
        password: str = None,
390
        name: str = None,
391
        timezone: str = '',
392
        lang: str= None,
393
        groups=[],
394
        do_save: bool=True,
395
        do_notify: bool=True,
396
    ) -> User:
397
        if do_notify and not self._config.EMAIL_NOTIFICATION_ACTIVATED:
398
            raise NotificationDisabled(
399
                "Can't create user with invitation mail because "
400
                "notification are disabled."
401
            )
402
        new_user = self.create_minimal_user(email, groups, save_now=False)
403
        self.update(
404
            user=new_user,
405
            name=name,
406
            email=email,
407
            password=password,
408
            timezone=timezone,
409
            lang=lang,
410
            do_save=False,
411
        )
412
        if do_notify:
413
            try:
414
                email_manager = get_email_manager(self._config, self._session)
415
                email_manager.notify_created_account(
416
                    new_user,
417
                    password=password
418
                )
419
            except SMTPException as exc:
420
                raise NotificationSendingFailed(
421
                    "Notification for new created account can't be send "
422
                    "(SMTP error), new account creation aborted"
423
                ) from exc
424
        if do_save:
425
            self.save(new_user)
426
        return new_user
427
428
    def create_minimal_user(
429
            self,
430
            email,
431
            groups=[],
432
            save_now=False
433
    ) -> User:
434
        """Previous create_user method"""
435
        self._check_email(email)
436
        user = User()
437
        user.email = email
438
        user.display_name = email.split('@')[0]
439
440
        if not groups:
441
            gapi = GroupApi(
442
                current_user=self._user,  # User
443
                session=self._session,
444
                config=self._config,
445
            )
446
            groups = [gapi.get_one(Group.TIM_USER)]
447
        for group in groups:
448
            user.groups.append(group)
449
450
        self._session.add(user)
451
452
        if save_now:
453
            self._session.flush()
454
455
        return user
456
457
    def reset_password_notification(self, user: User, do_save: bool=False) -> str:  # nopep8
458
        """
459
        Reset password notification
460
        :param user: User who want is password resetted
461
        :param do_save: save update ?
462
        :return: reset_password_token
463
        """
464
        if not self._config.EMAIL_NOTIFICATION_ACTIVATED:
465
            raise NotificationDisabled("cant reset password with notification disabled")  # nopep8
466
        token = user.generate_reset_password_token()
467
        try:
468
            email_manager = get_email_manager(self._config, self._session)
469
            email_manager.notify_reset_password(user, token)
470
        except SMTPException as exc:
471
            raise NotificationSendingFailed("SMTP error, can't send notification") from exc
472
        if do_save:
473
            self.save(user)
474
        return token
475
476
    def validate_reset_password_token(self, user: User, token: str) -> bool:
477
        return user.validate_reset_password_token(
478
            token=token,
479
            validity_seconds=self._config.USER_RESET_PASSWORD_TOKEN_VALIDITY,
480
        )
481
482
    def enable(self, user: User, do_save=False):
483
        user.is_active = True
484
        if do_save:
485
            self.save(user)
486
487
    def disable(self, user: User, do_save=False):
488
        if self._user and self._user == user:
489
            raise UserCantDisableHimself(
490
                "User {} can't disable himself".format(user.user_id)
491
            )
492
493
        user.is_active = False
494
        if do_save:
495
            self.save(user)
496
497
    def delete(self, user: User, do_save=False):
498
        if self._user and self._user == user:
499
            raise UserCantDeleteHimself(
500
                "User {} can't delete himself".format(user.user_id)
501
            )
502
        user.is_deleted = True
503
        if do_save:
504
            self.save(user)
505
506
    def undelete(self, user: User, do_save=False):
507
        user.is_deleted = False
508
        if do_save:
509
            self.save(user)
510
511
    def save(self, user: User):
512
        self._session.flush()
513
514
    def execute_created_user_actions(self, created_user: User) -> None:
515
        """
516
        Execute actions when user just been created
517
        :return:
518
        """
519
        # NOTE: Cyclic import
520
        # TODO - G.M - 28-03-2018 - [Calendar] Reenable Calendar stuff
521
        #from tracim.lib.calendar import CalendarManager
522
        #from tracim.model.organisational import UserCalendar
523
524
        # TODO - G.M - 04-04-2018 - [auth]
525
        # Check if this is already needed with
526
        # new auth system
527
        created_user.ensure_auth_token(
528
            validity_seconds=self._config.USER_AUTH_TOKEN_VALIDITY
529
        )
530
531
        # Ensure database is up-to-date
532
        self._session.flush()
533
        transaction.commit()
534
535
        # TODO - G.M - 28-03-2018 - [Calendar] Reenable Calendar stuff
536
        # calendar_manager = CalendarManager(created_user)
537
        # calendar_manager.create_then_remove_fake_event(
538
        #     calendar_class=UserCalendar,
539
        #     related_object_id=created_user.user_id,
540
        # )
541