Completed
Pull Request — develop (#31)
by inkhey
20:37 queued 14:44
created

UserApi.set_email()   A

Complexity

Conditions 3

Size

Total Lines 28
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 16
dl 0
loc 28
rs 9.6
c 0
b 0
f 0
cc 3
nop 5
1
# -*- coding: utf-8 -*-
2
from smtplib import SMTPException
3
4
import transaction
5
import typing as typing
6
7
from sqlalchemy import or_
8
from sqlalchemy.orm import Session
9
from sqlalchemy.orm import Query
10
from sqlalchemy.orm.exc import NoResultFound
11
12
from tracim_backend.config import CFG
13
from tracim_backend.models.auth import User
14
from tracim_backend.models.auth import Group
15
from tracim_backend.exceptions import NoUserSetted
16
from tracim_backend.exceptions import EmailAlreadyExistInDb
17
from tracim_backend.exceptions import TooShortAutocompleteString
18
from tracim_backend.exceptions import PasswordDoNotMatch
19
from tracim_backend.exceptions import EmailValidationFailed
20
from tracim_backend.exceptions import UserDoesNotExist
21
from tracim_backend.exceptions import WrongUserPassword
22
from tracim_backend.exceptions import AuthenticationFailed
23
from tracim_backend.exceptions import NotificationNotSend
24
from tracim_backend.exceptions import UserNotActive
25
from tracim_backend.models.context_models import UserInContext
26
from tracim_backend.lib.mail_notifier.notifier import get_email_manager
27
from tracim_backend.models.context_models import TypeUser
28
from tracim_backend.models.data import UserRoleInWorkspace
29
30
31
class UserApi(object):
32
33
    def __init__(
34
            self,
35
            current_user: typing.Optional[User],
36
            session: Session,
37
            config: CFG,
38
            show_deleted: bool = False,
39
    ) -> None:
40
        self._session = session
41
        self._user = current_user
42
        self._config = config
43
        self._show_deleted = show_deleted
44
45
    def _base_query(self):
46
        query = self._session.query(User)
47
        if not self._show_deleted:
48
            query = query.filter(User.is_deleted == False)
49
        return query
50
51
    def get_user_with_context(self, user: User) -> UserInContext:
52
        """
53
        Return UserInContext object from User
54
        """
55
        user = UserInContext(
56
            user=user,
57
            dbsession=self._session,
58
            config=self._config,
59
        )
60
        return user
61
62
    # Getters
63
64
    def get_one(self, user_id: int) -> User:
65
        """
66
        Get one user by user id
67
        """
68
        try:
69
            user = self._base_query().filter(User.user_id == user_id).one()
70
        except NoResultFound as exc:
71
            raise UserDoesNotExist('User "{}" not found in database'.format(user_id)) from exc  # nopep8
72
        return user
73
74
    def get_one_by_email(self, email: str) -> User:
75
        """
76
        Get one user by email
77
        :param email: Email of the user
78
        :return: one user
79
        """
80
        try:
81
            user = self._base_query().filter(User.email == email).one()
82
        except NoResultFound as exc:
83
            raise UserDoesNotExist('User "{}" not found in database'.format(email)) from exc  # nopep8
84
        return user
85
86
    def get_one_by_public_name(self, public_name: str) -> User:
87
        """
88
        Get one user by public_name
89
        """
90
        try:
91
            user = self._base_query().filter(User.display_name == public_name).one()
92
        except NoResultFound as exc:
93
            raise UserDoesNotExist('User "{}" not found in database'.format(public_name)) from exc  # nopep8
94
        return user
95
    # FIXME - G.M - 24-04-2018 - Duplicate method with get_one.
96
97
    def get_one_by_id(self, id: int) -> User:
98
        return self.get_one(user_id=id)
99
100
    def get_current_user(self) -> User:
101
        """
102
        Get current_user
103
        """
104
        if not self._user:
105
            raise UserDoesNotExist('There is no current user')
106
        return self._user
107
108
    def _get_all_query(self) -> Query:
109
        return self._session.query(User).order_by(User.display_name)
110
111
    def get_all(self) -> typing.Iterable[User]:
112
        return self._get_all_query().all()
113
114
    def get_known_user(
115
            self,
116
            acp: str,
117
    ) -> typing.Iterable[User]:
118
        """
119
        Return list of know user by current UserApi user.
120
        :param acp: autocomplete filter by name/email
121
        :return: List of found users
122
        """
123
        if len(acp) < 2:
124
            raise TooShortAutocompleteString(
125
                '"{acp}" is a too short string, acp string need to have more than one character'.format(acp=acp)  # nopep8
126
            )
127
        query = self._get_all_query()
128
        query = query.filter(or_(User.display_name.ilike('%{}%'.format(acp)), User.email.ilike('%{}%'.format(acp))))  # nopep8
129
130
        # INFO - G.M - 2018-07-27 - if user is set and is simple user, we
131
        # should show only user in same workspace as user
132
        if self._user and self._user.profile.id <= Group.TIM_USER:
133
            user_workspaces_id_query = self._session.\
134
                query(UserRoleInWorkspace.workspace_id).\
135
                distinct(UserRoleInWorkspace.workspace_id).\
136
                filter(UserRoleInWorkspace.user_id == self._user.user_id)
137
            users_in_workspaces = self._session.\
138
                query(UserRoleInWorkspace.user_id).\
139
                distinct(UserRoleInWorkspace.user_id).\
140
                filter(UserRoleInWorkspace.workspace_id.in_(user_workspaces_id_query.subquery())).subquery()  # nopep8
141
            query = query.filter(User.user_id.in_(users_in_workspaces))
142
        return query.all()
143
144
    def find(
145
            self,
146
            user_id: int=None,
147
            email: str=None,
148
            public_name: str=None
149
    ) -> typing.Tuple[TypeUser, User]:
150
        """
151
        Find existing user from all theses params.
152
        Check is made in this order: user_id, email, public_name
153
        If no user found raise UserDoesNotExist exception
154
        """
155
        user = None
156
157
        if user_id:
158
            try:
159
                user = self.get_one(user_id)
160
                return TypeUser.USER_ID, user
161
            except UserDoesNotExist:
162
                pass
163
        if email:
164
            try:
165
                user = self.get_one_by_email(email)
166
                return TypeUser.EMAIL, user
167
            except UserDoesNotExist:
168
                pass
169
        if public_name:
170
            try:
171
                user = self.get_one_by_public_name(public_name)
172
                return TypeUser.PUBLIC_NAME, user
173
            except UserDoesNotExist:
174
                pass
175
176
        raise UserDoesNotExist('User not found with any of given params.')
177
178
    # Check methods
179
180
    def user_with_email_exists(self, email: str) -> bool:
181
        try:
182
            self.get_one_by_email(email)
183
            return True
184
        # TODO - G.M - 09-04-2018 - Better exception
185
        except:
186
            return False
187
188
    def authenticate_user(self, email: str, password: str) -> User:
189
        """
190
        Authenticate user with email and password, raise AuthenticationFailed
191
        if uncorrect.
192
        :param email: email of the user
193
        :param password: cleartext password of the user
194
        :return: User who was authenticated.
195
        """
196
        try:
197
            user = self.get_one_by_email(email)
198
            if not user.is_active:
199
                raise UserNotActive('User "{}" is not active'.format(email))
200
            if user.validate_password(password):
201
                return user
202
            else:
203
                raise WrongUserPassword('User "{}" password is incorrect'.format(email))  # nopep8
204
        except (WrongUserPassword, UserDoesNotExist) as exc:
205
            raise AuthenticationFailed('User "{}" authentication failed'.format(email)) from exc  # nopep8
206
207
    # Actions
208
    def set_password(
209
            self,
210
            user: User,
211
            loggedin_user_password: str,
212
            new_password: str,
213
            new_password2: str,
214
            do_save: bool=True
215
    ):
216
        """
217
        Set User password if loggedin user password is correct
218
        and both new_password are the same.
219
        :param user: User who need password changed
220
        :param loggedin_user_password: cleartext password of logged user (not
221
        same as user)
222
        :param new_password: new password for user
223
        :param new_password2: should be same as new_password
224
        :param do_save: should we save new user password ?
225
        :return:
226
        """
227
        if not self._user:
228
            raise NoUserSetted('Current User should be set in UserApi to use this method')  # nopep8
229
        if not self._user.validate_password(loggedin_user_password):  # nopep8
230
            raise WrongUserPassword(
231
                'Wrong password for authenticated user {}'. format(self._user.user_id)  # nopep8
232
            )
233
        if new_password != new_password2:
234
            raise PasswordDoNotMatch('Passwords given are different')
235
236
        self.update(
237
            user=user,
238
            password=new_password,
239
            do_save=do_save,
240
        )
241
        if do_save:
242
            # TODO - G.M - 2018-07-24 - Check why commit is needed here
243
            self.save(user)
244
        return user
245
246
    def set_email(
247
            self,
248
            user: User,
249
            loggedin_user_password: str,
250
            email: str,
251
            do_save: bool = True
252
    ):
253
        """
254
        Set email address of user if loggedin user password is correct
255
        :param user: User who need email changed
256
        :param loggedin_user_password: cleartext password of logged user (not
257
        same as user)
258
        :param email:
259
        :param do_save:
260
        :return:
261
        """
262
        if not self._user:
263
            raise NoUserSetted('Current User should be set in UserApi to use this method')  # nopep8
264
        if not self._user.validate_password(loggedin_user_password):  # nopep8
265
            raise WrongUserPassword(
266
                'Wrong password for authenticated user {}'. format(self._user.user_id)  # nopep8
267
            )
268
        self.update(
269
            user=user,
270
            email=email,
271
            do_save=do_save,
272
        )
273
        return user
274
275
    def _check_email(self, email: str) -> bool:
276
        """
277
        Check if email is completely ok to be used in user db table
278
        """
279
        is_email_correct = self._check_email_correctness(email)
280
        if not is_email_correct:
281
            raise EmailValidationFailed(
282
                'Email given form {} is uncorrect'.format(email))  # nopep8
283
        email_already_exist_in_db = self.check_email_already_in_db(email)
284
        if email_already_exist_in_db:
285
            raise EmailAlreadyExistInDb(
286
                'Email given {} already exist, please choose something else'.format(email)  # nopep8
287
            )
288
        return True
289
290
    def check_email_already_in_db(self, email: str) -> bool:
291
        """
292
        Verify if given email does not already exist in db
293
        """
294
        return self._session.query(User.email).filter(User.email==email).count() != 0  # nopep8
295
296
    def _check_email_correctness(self, email: str) -> bool:
297
        """
298
           Verify if given email is correct:
299
           - check format
300
           - futur active check for email ? (dns based ?)
301
           """
302
        # TODO - G.M - 2018-07-05 - find a better way to check email
303
        if not email:
304
            return False
305
        email = email.split('@')
306
        if len(email) != 2:
307
            return False
308
        return True
309
310
    def update(
311
            self,
312
            user: User,
313
            name: str=None,
314
            email: str=None,
315
            password: str=None,
316
            timezone: str=None,
317
            lang: str=None,
318
            groups: typing.Optional[typing.List[Group]]=None,
319
            do_save=True,
320
    ) -> User:
321
        if name is not None:
322
            user.display_name = name
323
324
        if email is not None and email != user.email:
325
            self._check_email(email)
326
            user.email = email
327
328
        if password is not None:
329
            user.password = password
330
331
        if timezone is not None:
332
            user.timezone = timezone
333
334
        if lang is not None:
335
            user.lang = lang
336
337
        if groups is not None:
338
            # INFO - G.M - 2018-07-18 - Delete old groups
339
            for group in user.groups:
340
                if group not in groups:
341
                    user.groups.remove(group)
342
            # INFO - G.M - 2018-07-18 - add new groups
343
            for group in groups:
344
                if group not in user.groups:
345
                    user.groups.append(group)
346
347
        if do_save:
348
            self.save(user)
349
350
        return user
351
352
    def create_user(
353
        self,
354
        email,
355
        password: str = None,
356
        name: str = None,
357
        timezone: str = '',
358
        lang: str= None,
359
        groups=[],
360
        do_save: bool=True,
361
        do_notify: bool=True,
362
    ) -> User:
363
        new_user = self.create_minimal_user(email, groups, save_now=False)
364
        self.update(
365
            user=new_user,
366
            name=name,
367
            email=email,
368
            password=password,
369
            timezone=timezone,
370
            lang=lang,
371
            do_save=False,
372
        )
373
        if do_notify:
374
            try:
375
                email_manager = get_email_manager(self._config, self._session)
376
                email_manager.notify_created_account(
377
                    new_user,
378
                    password=password
379
                )
380
            except SMTPException as e:
381
                raise NotificationNotSend()
382
        if do_save:
383
            self.save(new_user)
384
        return new_user
385
386
    def create_minimal_user(
387
            self,
388
            email,
389
            groups=[],
390
            save_now=False
391
    ) -> User:
392
        """Previous create_user method"""
393
        self._check_email(email)
394
        user = User()
395
        user.email = email
396
        user.display_name = email.split('@')[0]
397
398
        for group in groups:
399
            user.groups.append(group)
400
401
        self._session.add(user)
402
403
        if save_now:
404
            self._session.flush()
405
406
        return user
407
408
    def enable(self, user: User, do_save=False):
409
        user.is_active = True
410
        if do_save:
411
            self.save(user)
412
413
    def disable(self, user:User, do_save=False):
414
        user.is_active = False
415
        if do_save:
416
            self.save(user)
417
418
    def delete(self, user: User, do_save=False):
419
        user.is_deleted = True
420
        if do_save:
421
            self.save(user)
422
423
    def undelete(self, user: User, do_save=False):
424
        user.is_deleted = False
425
        if do_save:
426
            self.save(user)
427
428
    def save(self, user: User):
429
        self._session.flush()
430
431
    def execute_created_user_actions(self, created_user: User) -> None:
432
        """
433
        Execute actions when user just been created
434
        :return:
435
        """
436
        # NOTE: Cyclic import
437
        # TODO - G.M - 28-03-2018 - [Calendar] Reenable Calendar stuff
438
        #from tracim.lib.calendar import CalendarManager
439
        #from tracim.model.organisational import UserCalendar
440
441
        # TODO - G.M - 04-04-2018 - [auth]
442
        # Check if this is already needed with
443
        # new auth system
444
        created_user.ensure_auth_token(
445
            session=self._session,
446
            validity_seconds=self._config.USER_AUTH_TOKEN_VALIDITY
447
        )
448
449
        # Ensure database is up-to-date
450
        self._session.flush()
451
        transaction.commit()
452
453
        # TODO - G.M - 28-03-2018 - [Calendar] Reenable Calendar stuff
454
        # calendar_manager = CalendarManager(created_user)
455
        # calendar_manager.create_then_remove_fake_event(
456
        #     calendar_class=UserCalendar,
457
        #     related_object_id=created_user.user_id,
458
        # )
459