Completed
Push — develop ( 10efb0...b51ddf )
by Bastien
16s queued 14s
created

UserApi.set_email()   A

Complexity

Conditions 3

Size

Total Lines 28
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 16
dl 0
loc 28
rs 9.6
c 0
b 0
f 0
cc 3
nop 5
1
# -*- coding: utf-8 -*-
2
import typing as typing
3
from smtplib import SMTPException
4
5
import transaction
6
from sqlalchemy import or_
7
from sqlalchemy.orm import Query
8
from sqlalchemy.orm import Session
9
from sqlalchemy.orm.exc import NoResultFound
10
11
from tracim_backend.config import CFG
12
from tracim_backend.exceptions import AuthenticationFailed
13
from tracim_backend.exceptions import EmailAlreadyExistInDb
14
from tracim_backend.exceptions import EmailValidationFailed
15
from tracim_backend.exceptions import NotificationDisabled
16
from tracim_backend.exceptions import NotificationSendingFailed
17
from tracim_backend.exceptions import NoUserSetted
18
from tracim_backend.exceptions import PasswordDoNotMatch
19
from tracim_backend.exceptions import TooShortAutocompleteString
20
from tracim_backend.exceptions import UnvalidResetPasswordToken
21
from tracim_backend.exceptions import UserDoesNotExist
22
from tracim_backend.exceptions import UserNotActive
23
from tracim_backend.exceptions import WrongUserPassword
24
from tracim_backend.lib.core.group import GroupApi
25
from tracim_backend.lib.mail_notifier.notifier import get_email_manager
26
from tracim_backend.models.auth import Group
27
from tracim_backend.models.auth import User
28
from tracim_backend.models.context_models import TypeUser
29
from tracim_backend.models.context_models import UserInContext
30
from tracim_backend.models.data import UserRoleInWorkspace
31
32
33
class UserApi(object):
34
35
    def __init__(
36
            self,
37
            current_user: typing.Optional[User],
38
            session: Session,
39
            config: CFG,
40
            show_deleted: bool = False,
41
    ) -> None:
42
        self._session = session
43
        self._user = current_user
44
        self._config = config
45
        self._show_deleted = show_deleted
46
47
    def _base_query(self):
48
        query = self._session.query(User)
49
        if not self._show_deleted:
50
            query = query.filter(User.is_deleted == False)
51
        return query
52
53
    def get_user_with_context(self, user: User) -> UserInContext:
54
        """
55
        Return UserInContext object from User
56
        """
57
        user = UserInContext(
58
            user=user,
59
            dbsession=self._session,
60
            config=self._config,
61
        )
62
        return user
63
64
    # Getters
65
66
    def get_one(self, user_id: int) -> User:
67
        """
68
        Get one user by user id
69
        """
70
        try:
71
            user = self._base_query().filter(User.user_id == user_id).one()
72
        except NoResultFound as exc:
73
            raise UserDoesNotExist('User "{}" not found in database'.format(user_id)) from exc  # nopep8
74
        return user
75
76
    def get_one_by_email(self, email: str) -> User:
77
        """
78
        Get one user by email
79
        :param email: Email of the user
80
        :return: one user
81
        """
82
        try:
83
            user = self._base_query().filter(User.email == email).one()
84
        except NoResultFound as exc:
85
            raise UserDoesNotExist('User "{}" not found in database'.format(email)) from exc  # nopep8
86
        return user
87
88
    def get_one_by_public_name(self, public_name: str) -> User:
89
        """
90
        Get one user by public_name
91
        """
92
        try:
93
            user = self._base_query().filter(User.display_name == public_name).one()
94
        except NoResultFound as exc:
95
            raise UserDoesNotExist('User "{}" not found in database'.format(public_name)) from exc  # nopep8
96
        return user
97
    # FIXME - G.M - 24-04-2018 - Duplicate method with get_one.
98
99
    def get_one_by_id(self, id: int) -> User:
100
        return self.get_one(user_id=id)
101
102
    def get_current_user(self) -> User:
103
        """
104
        Get current_user
105
        """
106
        if not self._user:
107
            raise UserDoesNotExist('There is no current user')
108
        return self._user
109
110
    def _get_all_query(self) -> Query:
111
        return self._session.query(User).order_by(User.display_name)
112
113
    def get_all(self) -> typing.Iterable[User]:
114
        return self._get_all_query().all()
115
116
    def get_known_user(
117
            self,
118
            acp: str,
119
    ) -> typing.Iterable[User]:
120
        """
121
        Return list of know user by current UserApi user.
122
        :param acp: autocomplete filter by name/email
123
        :return: List of found users
124
        """
125
        if len(acp) < 2:
126
            raise TooShortAutocompleteString(
127
                '"{acp}" is a too short string, acp string need to have more than one character'.format(acp=acp)  # nopep8
128
            )
129
        query = self._get_all_query()
130
        query = query.filter(or_(User.display_name.ilike('%{}%'.format(acp)), User.email.ilike('%{}%'.format(acp))))  # nopep8
131
132
        # INFO - G.M - 2018-07-27 - if user is set and is simple user, we
133
        # should show only user in same workspace as user
134
        if self._user and self._user.profile.id <= Group.TIM_USER:
135
            user_workspaces_id_query = self._session.\
136
                query(UserRoleInWorkspace.workspace_id).\
137
                distinct(UserRoleInWorkspace.workspace_id).\
138
                filter(UserRoleInWorkspace.user_id == self._user.user_id)
139
            users_in_workspaces = self._session.\
140
                query(UserRoleInWorkspace.user_id).\
141
                distinct(UserRoleInWorkspace.user_id).\
142
                filter(UserRoleInWorkspace.workspace_id.in_(user_workspaces_id_query.subquery())).subquery()  # nopep8
143
            query = query.filter(User.user_id.in_(users_in_workspaces))
144
        return query.all()
145
146
    def find(
147
            self,
148
            user_id: int=None,
149
            email: str=None,
150
            public_name: str=None
151
    ) -> typing.Tuple[TypeUser, User]:
152
        """
153
        Find existing user from all theses params.
154
        Check is made in this order: user_id, email, public_name
155
        If no user found raise UserDoesNotExist exception
156
        """
157
        user = None
158
159
        if user_id:
160
            try:
161
                user = self.get_one(user_id)
162
                return TypeUser.USER_ID, user
163
            except UserDoesNotExist:
164
                pass
165
        if email:
166
            try:
167
                user = self.get_one_by_email(email)
168
                return TypeUser.EMAIL, user
169
            except UserDoesNotExist:
170
                pass
171
        if public_name:
172
            try:
173
                user = self.get_one_by_public_name(public_name)
174
                return TypeUser.PUBLIC_NAME, user
175
            except UserDoesNotExist:
176
                pass
177
178
        raise UserDoesNotExist('User not found with any of given params.')
179
180
    # Check methods
181
182
    def user_with_email_exists(self, email: str) -> bool:
183
        try:
184
            self.get_one_by_email(email)
185
            return True
186
        # TODO - G.M - 09-04-2018 - Better exception
187
        except:
188
            return False
189
190
    def authenticate_user(self, email: str, password: str) -> User:
191
        """
192
        Authenticate user with email and password, raise AuthenticationFailed
193
        if uncorrect.
194
        :param email: email of the user
195
        :param password: cleartext password of the user
196
        :return: User who was authenticated.
197
        """
198
        try:
199
            user = self.get_one_by_email(email)
200
            if not user.is_active:
201
                raise UserNotActive('User "{}" is not active'.format(email))
202
            if user.validate_password(password):
203
                return user
204
            else:
205
                raise WrongUserPassword('User "{}" password is incorrect'.format(email))  # nopep8
206
        except (WrongUserPassword, UserDoesNotExist) as exc:
207
            raise AuthenticationFailed('User "{}" authentication failed'.format(email)) from exc  # nopep8
208
209
    # Actions
210
    def set_password(
211
            self,
212
            user: User,
213
            loggedin_user_password: str,
214
            new_password: str,
215
            new_password2: str,
216
            do_save: bool=True
217
    ):
218
        """
219
        Set User password if loggedin user password is correct
220
        and both new_password are the same.
221
        :param user: User who need password changed
222
        :param loggedin_user_password: cleartext password of logged user (not
223
        same as user)
224
        :param new_password: new password for user
225
        :param new_password2: should be same as new_password
226
        :param do_save: should we save new user password ?
227
        :return:
228
        """
229
        if not self._user:
230
            raise NoUserSetted('Current User should be set in UserApi to use this method')  # nopep8
231
        if not self._user.validate_password(loggedin_user_password):  # nopep8
232
            raise WrongUserPassword(
233
                'Wrong password for authenticated user {}'. format(self._user.user_id)  # nopep8
234
            )
235
        if new_password != new_password2:
236
            raise PasswordDoNotMatch('Passwords given are different')
237
238
        self.update(
239
            user=user,
240
            password=new_password,
241
            do_save=do_save,
242
        )
243
        if do_save:
244
            # TODO - G.M - 2018-07-24 - Check why commit is needed here
245
            self.save(user)
246
        return user
247
248
    def set_email(
249
            self,
250
            user: User,
251
            loggedin_user_password: str,
252
            email: str,
253
            do_save: bool = True
254
    ):
255
        """
256
        Set email address of user if loggedin user password is correct
257
        :param user: User who need email changed
258
        :param loggedin_user_password: cleartext password of logged user (not
259
        same as user)
260
        :param email:
261
        :param do_save:
262
        :return:
263
        """
264
        if not self._user:
265
            raise NoUserSetted('Current User should be set in UserApi to use this method')  # nopep8
266
        if not self._user.validate_password(loggedin_user_password):  # nopep8
267
            raise WrongUserPassword(
268
                'Wrong password for authenticated user {}'. format(self._user.user_id)  # nopep8
269
            )
270
        self.update(
271
            user=user,
272
            email=email,
273
            do_save=do_save,
274
        )
275
        return user
276
277
    def set_password_reset_token(
278
            self,
279
            user: User,
280
            new_password: str,
281
            new_password2: str,
282
            reset_token: str,
283
            do_save: bool = False,
284
    ):
285
        self.validate_reset_password_token(user, reset_token)
286
        if new_password != new_password2:
287
            raise PasswordDoNotMatch('Passwords given are different')
288
289
        self.update(
290
            user=user,
291
            password=new_password,
292
            do_save=do_save,
293
        )
294
        user.reset_tokens()
295
        if do_save:
296
            self.save(user)
297
        return user
298
299
    def _check_email(self, email: str) -> bool:
300
        """
301
        Check if email is completely ok to be used in user db table
302
        """
303
        is_email_correct = self._check_email_correctness(email)
304
        if not is_email_correct:
305
            raise EmailValidationFailed(
306
                'Email given form {} is uncorrect'.format(email))  # nopep8
307
        email_already_exist_in_db = self.check_email_already_in_db(email)
308
        if email_already_exist_in_db:
309
            raise EmailAlreadyExistInDb(
310
                'Email given {} already exist, please choose something else'.format(email)  # nopep8
311
            )
312
        return True
313
314
    def check_email_already_in_db(self, email: str) -> bool:
315
        """
316
        Verify if given email does not already exist in db
317
        """
318
        return self._session.query(User.email).filter(User.email==email).count() != 0  # nopep8
319
320
    def _check_email_correctness(self, email: str) -> bool:
321
        """
322
           Verify if given email is correct:
323
           - check format
324
           - futur active check for email ? (dns based ?)
325
           """
326
        # TODO - G.M - 2018-07-05 - find a better way to check email
327
        if not email:
328
            return False
329
        email = email.split('@')
330
        if len(email) != 2:
331
            return False
332
        return True
333
334
    def update(
335
            self,
336
            user: User,
337
            name: str=None,
338
            email: str=None,
339
            password: str=None,
340
            timezone: str=None,
341
            lang: str=None,
342
            groups: typing.Optional[typing.List[Group]]=None,
343
            do_save=True,
344
    ) -> User:
345
        if name is not None:
346
            user.display_name = name
347
348
        if email is not None and email != user.email:
349
            self._check_email(email)
350
            user.email = email
351
352
        if password is not None:
353
            user.password = password
354
355
        if timezone is not None:
356
            user.timezone = timezone
357
358
        if lang is not None:
359
            user.lang = lang
360
361
        if groups is not None:
362
            # INFO - G.M - 2018-07-18 - Delete old groups
363
            for group in user.groups:
364
                if group not in groups:
365
                    user.groups.remove(group)
366
            # INFO - G.M - 2018-07-18 - add new groups
367
            for group in groups:
368
                if group not in user.groups:
369
                    user.groups.append(group)
370
371
        if do_save:
372
            self.save(user)
373
374
        return user
375
376
    def create_user(
377
        self,
378
        email,
379
        password: str = None,
380
        name: str = None,
381
        timezone: str = '',
382
        lang: str= None,
383
        groups=[],
384
        do_save: bool=True,
385
        do_notify: bool=True,
386
    ) -> User:
387
        if do_notify and not self._config.EMAIL_NOTIFICATION_ACTIVATED:
388
            raise NotificationDisabled(
389
                "Can't create user with invitation mail because "
390
                "notification are disabled."
391
            )
392
        new_user = self.create_minimal_user(email, groups, save_now=False)
393
        self.update(
394
            user=new_user,
395
            name=name,
396
            email=email,
397
            password=password,
398
            timezone=timezone,
399
            lang=lang,
400
            do_save=False,
401
        )
402
        if do_notify:
403
            try:
404
                email_manager = get_email_manager(self._config, self._session)
405
                email_manager.notify_created_account(
406
                    new_user,
407
                    password=password
408
                )
409
            except SMTPException as exc:
410
                raise NotificationSendingFailed(
411
                    "Notification for new created account can't be send "
412
                    "(SMTP error), new account creation aborted"
413
                ) from exc
414
        if do_save:
415
            self.save(new_user)
416
        return new_user
417
418
    def create_minimal_user(
419
            self,
420
            email,
421
            groups=[],
422
            save_now=False
423
    ) -> User:
424
        """Previous create_user method"""
425
        self._check_email(email)
426
        user = User()
427
        user.email = email
428
        user.display_name = email.split('@')[0]
429
430
        if not groups:
431
            gapi = GroupApi(
432
                current_user=self._user,  # User
433
                session=self._session,
434
                config=self._config,
435
            )
436
            groups = [gapi.get_one(Group.TIM_USER)]
437
        for group in groups:
438
            user.groups.append(group)
439
440
        self._session.add(user)
441
442
        if save_now:
443
            self._session.flush()
444
445
        return user
446
447
    def reset_password_notification(self, user: User, do_save: bool=False) -> str:  # nopep8
448
        """
449
        Reset password notification
450
        :param user: User who want is password resetted
451
        :param do_save: save update ?
452
        :return: reset_password_token
453
        """
454
        if not self._config.EMAIL_NOTIFICATION_ACTIVATED:
455
            raise NotificationDisabled("cant reset password with notification disabled")  # nopep8
456
        token = user.generate_reset_password_token()
457
        try:
458
            email_manager = get_email_manager(self._config, self._session)
459
            email_manager.notify_reset_password(user, token)
460
        except SMTPException as exc:
461
            raise NotificationSendingFailed("SMTP error, can't send notification") from exc
462
        if do_save:
463
            self.save(user)
464
        return token
465
466
    def validate_reset_password_token(self, user: User, token: str) -> bool:
467
        return user.validate_reset_password_token(
468
            token=token,
469
            validity_seconds=self._config.USER_RESET_PASSWORD_TOKEN_VALIDITY,
470
        )
471
472
    def enable(self, user: User, do_save=False):
473
        user.is_active = True
474
        if do_save:
475
            self.save(user)
476
477
    def disable(self, user: User, do_save=False):
478
        user.is_active = False
479
        if do_save:
480
            self.save(user)
481
482
    def delete(self, user: User, do_save=False):
483
        user.is_deleted = True
484
        if do_save:
485
            self.save(user)
486
487
    def undelete(self, user: User, do_save=False):
488
        user.is_deleted = False
489
        if do_save:
490
            self.save(user)
491
492
    def save(self, user: User):
493
        self._session.flush()
494
495
    def execute_created_user_actions(self, created_user: User) -> None:
496
        """
497
        Execute actions when user just been created
498
        :return:
499
        """
500
        # NOTE: Cyclic import
501
        # TODO - G.M - 28-03-2018 - [Calendar] Reenable Calendar stuff
502
        #from tracim.lib.calendar import CalendarManager
503
        #from tracim.model.organisational import UserCalendar
504
505
        # TODO - G.M - 04-04-2018 - [auth]
506
        # Check if this is already needed with
507
        # new auth system
508
        created_user.ensure_auth_token(
509
            validity_seconds=self._config.USER_AUTH_TOKEN_VALIDITY
510
        )
511
512
        # Ensure database is up-to-date
513
        self._session.flush()
514
        transaction.commit()
515
516
        # TODO - G.M - 28-03-2018 - [Calendar] Reenable Calendar stuff
517
        # calendar_manager = CalendarManager(created_user)
518
        # calendar_manager.create_then_remove_fake_event(
519
        #     calendar_class=UserCalendar,
520
        #     related_object_id=created_user.user_id,
521
        # )
522